Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: @llamaindex/postgres #1597

Merged
merged 23 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ jobs:
done
- name: Pack provider packages
run: |
for dir in packages/providers/*; do
for dir in packages/providers/* packages/providers/storage/*; do
if [ -d "$dir" ] && [ -f "$dir/package.json" ]; then
echo "Packing $dir"
pnpm pack --pack-destination ${{ runner.temp }} -C $dir
Expand Down
1 change: 1 addition & 0 deletions packages/llamaindex/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"@llamaindex/readers": "workspace:*",
"@llamaindex/replicate": "workspace:*",
"@llamaindex/vllm": "workspace:*",
"@llamaindex/postgres": "workspace:*",
"@mistralai/mistralai": "^1.3.4",
"@mixedbread-ai/sdk": "^2.2.11",
"@pinecone-database/pinecone": "^4.0.0",
Expand Down
64 changes: 64 additions & 0 deletions packages/providers/storage/postgres/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
"name": "@llamaindex/postgres",
"description": "PostgreSQL Storage for LlamaIndex",
"version": "0.0.29",
"type": "module",
"main": "./dist/index.cjs",
"module": "./dist/index.js",
"exports": {
".": {
"edge-light": {
"types": "./dist/index.edge-light.d.ts",
"default": "./dist/index.edge-light.js"
},
"workerd": {
"types": "./dist/index.edge-light.d.ts",
"default": "./dist/index.edge-light.js"
},
"require": {
"types": "./dist/index.d.cts",
"default": "./dist/index.cjs"
},
"import": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
}
}
},
"files": [
"dist"
],
"repository": {
"type": "git",
"url": "https://github.com/run-llama/LlamaIndexTS.git",
"directory": "packages/providers/storage/postgres"
},
"scripts": {
"build": "bunchee",
"dev": "bunchee --watch"
},
"devDependencies": {
"bunchee": "6.2.0",
"pg": "^8.12.0",
"pgvector": "0.2.0",
"@types/pg": "^8.11.8"
},
"dependencies": {
"@llamaindex/core": "workspace:*",
"@llamaindex/env": "workspace:*",
"pg": "^8.11.3",
"pg-promise": "^11.5.4"
},
"peerDependencies": {
"pg": "^8.12.0",
"pgvector": "0.2.0"
},
"peerDependenciesMeta": {
"pg": {
"optional": true
},
"pgvector": {
"optional": true
}
}
}
30 changes: 30 additions & 0 deletions packages/providers/storage/postgres/src/PostgresDocumentStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { DEFAULT_NAMESPACE } from "@llamaindex/core/global";
import { noneSerializer } from "@llamaindex/core/storage/doc-store";
import { PostgresKVStore, type PostgresKVStoreConfig } from "./PostgresKVStore";

const DEFAULT_TABLE_NAME = "llamaindex_doc_store";

export type PostgresDocumentStoreConfig = PostgresKVStoreConfig & {
namespace?: string;
};

export class PostgresDocumentStore extends KVDocumentStore {
thucpn marked this conversation as resolved.
Show resolved Hide resolved
serializer = noneSerializer;

constructor(config?: PostgresDocumentStoreConfig) {
const kvStore = new PostgresKVStore({
schemaName: config?.schemaName,
tableName: config?.tableName || DEFAULT_TABLE_NAME,
...(config && "clientConfig" in config
? { clientConfig: config.clientConfig }
: config && "client" in config
? {
client: config.client,
shouldConnect: config.shouldConnect ?? false,
}
: {}),
});
const namespace = config?.namespace || DEFAULT_NAMESPACE;
super(kvStore, namespace);
}
}
28 changes: 28 additions & 0 deletions packages/providers/storage/postgres/src/PostgresIndexStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { DEFAULT_NAMESPACE } from "@llamaindex/core/global";
import { KVIndexStore } from "@llamaindex/core/storage/index-store";
import { PostgresKVStore, type PostgresKVStoreConfig } from "./PostgresKVStore";

const DEFAULT_TABLE_NAME = "llamaindex_index_store";

export type PostgresIndexStoreConfig = PostgresKVStoreConfig & {
namespace?: string;
};

export class PostgresIndexStore extends KVIndexStore {
constructor(config?: PostgresIndexStoreConfig) {
const kvStore = new PostgresKVStore({
schemaName: config?.schemaName,
tableName: config?.tableName || DEFAULT_TABLE_NAME,
...(config && "clientConfig" in config
? { clientConfig: config.clientConfig }
: config && "client" in config
? {
client: config.client,
shouldConnect: config.shouldConnect ?? false,
}
: {}),
});
const namespace = config?.namespace || DEFAULT_NAMESPACE;
super(kvStore, namespace);
}
}
173 changes: 173 additions & 0 deletions packages/providers/storage/postgres/src/PostgresKVStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
import { DEFAULT_COLLECTION } from "@llamaindex/core/global";
import type { StoredValue } from "@llamaindex/core/schema";
import { BaseKVStore } from "@llamaindex/core/storage/kv-store";
import type pg from "pg";

export type DataType = Record<string, Record<string, StoredValue>>;

const DEFAULT_SCHEMA_NAME = "public";
const DEFAULT_TABLE_NAME = "llamaindex_kv_store";

export type PostgresKVStoreBaseConfig = {
schemaName?: string | undefined;
tableName?: string | undefined;
};

export type PostgresKVStoreClientConfig =
| {
/**
* Client configuration options for the pg client.
*
* {@link https://node-postgres.com/apis/client#new-client PostgresSQL Client API}
*/
clientConfig?: pg.ClientConfig | undefined;
}
| {
/**
* A pg client or pool client instance.
* If provided, make sure it is not connected to the database yet, or it will throw an error.
*/
shouldConnect?: boolean | undefined;
client?: pg.Client | pg.PoolClient;
};

export type PostgresKVStoreConfig = PostgresKVStoreBaseConfig &
PostgresKVStoreClientConfig;

export class PostgresKVStore extends BaseKVStore {
private schemaName: string;
private tableName: string;

private isDBConnected: boolean = false;
private clientConfig: pg.ClientConfig | undefined = undefined;
private db?: pg.ClientBase | undefined = undefined;

constructor(config?: PostgresKVStoreConfig) {
super();
this.schemaName = config?.schemaName || DEFAULT_SCHEMA_NAME;
this.tableName = config?.tableName || DEFAULT_TABLE_NAME;
if (config) {
if ("clientConfig" in config) {
this.clientConfig = config.clientConfig;
} else if ("client" in config) {
this.isDBConnected =
config?.shouldConnect !== undefined ? !config.shouldConnect : false;
this.db = config.client;
}
}
}

private async getDb(): Promise<pg.ClientBase> {
if (!this.db) {
const pg = await import("pg");
const { Client } = pg.default ? pg.default : pg;
const db = new Client({ ...this.clientConfig });
await db.connect();
this.isDBConnected = true;
this.db = db;
}
if (this.db && !this.isDBConnected) {
await this.db.connect();
this.isDBConnected = true;
}
this.db.on("end", () => {
this.isDBConnected = false;
});
await this.checkSchema(this.db);
return this.db;
}

private async checkSchema(db: pg.ClientBase) {
await db.query(`CREATE SCHEMA IF NOT EXISTS ${this.schemaName}`);
const tbl = `CREATE TABLE IF NOT EXISTS ${this.schemaName}.${this.tableName} (
id uuid DEFAULT gen_random_uuid() PRIMARY KEY,
collection VARCHAR,
key VARCHAR,
value JSONB DEFAULT '{}'
)`;
await db.query(tbl);
const idxs = `CREATE INDEX IF NOT EXISTS idx_${this.tableName}_collection ON ${this.schemaName}.${this.tableName} (collection);
CREATE INDEX IF NOT EXISTS idx_${this.tableName}_key ON ${this.schemaName}.${this.tableName} (key);`;
await db.query(idxs);
return db;
}

client() {
return this.getDb();
}

async put(
key: string,
val: StoredValue,
collection: string = DEFAULT_COLLECTION,
): Promise<void> {
const db = await this.getDb();
try {
await db.query("BEGIN");
const sql = `
INSERT INTO ${this.schemaName}.${this.tableName}
(collection, key, value)
VALUES ($1, $2, $3)
ON CONFLICT (id) DO UPDATE SET
collection = EXCLUDED.collection,
key = EXCLUDED.key,
value = EXCLUDED.value
RETURNING id
`;
const values = [collection, key, val];
await db.query(sql, values);
await db.query("COMMIT");
} catch (error) {
await db.query("ROLLBACK");
throw error;
}
}

async get(key: string, collection: string = DEFAULT_COLLECTION) {
const db = await this.getDb();
try {
await db.query("BEGIN");
const sql = `SELECT * FROM ${this.schemaName}.${this.tableName} WHERE key = $1 AND collection = $2`;
const result = await db.query(sql, [key, collection]);
await db.query("COMMIT");
return result.rows[0]?.value;
} catch (error) {
await db.query("ROLLBACK");
throw error;
}
}

async getAll(collection: string = DEFAULT_COLLECTION): Promise<DataType> {
const db = await this.getDb();
try {
await db.query("BEGIN");
const sql = `SELECT * FROM ${this.schemaName}.${this.tableName} WHERE collection = $1`;
const result = await db.query(sql, [collection]);
await db.query("COMMIT");
return result.rows.reduce((acc, row) => {
acc[row.key] = row.value;
return acc;
}, {});
} catch (error) {
await db.query("ROLLBACK");
throw error;
}
}

async delete(
key: string,
collection: string = DEFAULT_COLLECTION,
): Promise<boolean> {
const db = await this.getDb();
try {
await db.query("BEGIN");
const sql = `DELETE FROM ${this.schemaName}.${this.tableName} WHERE key = $1 AND collection = $2`;
const result = await db.query(sql, [key, collection]);
await db.query("COMMIT");
return !!result.rowCount && result.rowCount > 0;
} catch (error) {
await db.query("ROLLBACK");
throw error;
}
}
}
3 changes: 3 additions & 0 deletions packages/providers/storage/postgres/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from "./PostgresDocumentStore";
export * from "./PostgresIndexStore";
export * from "./PostgresKVStore";
19 changes: 19 additions & 0 deletions packages/providers/storage/postgres/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"extends": "../../../../tsconfig.json",
"compilerOptions": {
"target": "ESNext",
"module": "ESNext",
"moduleResolution": "bundler",
"outDir": "./lib",
"tsBuildInfoFile": "./lib/.tsbuildinfo"
},
"include": ["./src"],
"references": [
{
"path": "../../../core/tsconfig.json"
},
{
"path": "../../../env/tsconfig.json"
}
]
}
Loading
Loading