From 5e2a59276706d3d3f00abdabb684d17ddd615d1e Mon Sep 17 00:00:00 2001 From: emekaokoli19 Date: Thu, 2 May 2024 09:21:15 +0100 Subject: [PATCH 1/5] testing --- src/vdf_io/export_vdf/weaviate_export.py | 99 +++++++++++++--- src/vdf_io/import_vdf/weaviate_import.py | 139 +++++++++++++++++++++++ 2 files changed, 224 insertions(+), 14 deletions(-) create mode 100644 src/vdf_io/import_vdf/weaviate_import.py diff --git a/src/vdf_io/export_vdf/weaviate_export.py b/src/vdf_io/export_vdf/weaviate_export.py index 518dd3c..0072fcd 100644 --- a/src/vdf_io/export_vdf/weaviate_export.py +++ b/src/vdf_io/export_vdf/weaviate_export.py @@ -2,14 +2,18 @@ from tqdm import tqdm import weaviate +import json from vdf_io.export_vdf.vdb_export_cls import ExportVDB +from vdf_io.meta_types import NamespaceMeta from vdf_io.names import DBNames from vdf_io.util import set_arg_from_input, set_arg_from_password +from typing import Dict, List # Set these environment variables URL = os.getenv("YOUR_WCS_URL") APIKEY = os.getenv("YOUR_WCS_API_KEY") +OPENAI_APIKEY = os.getenv("OPENAI_APIKEY") class ExportWeaviate(ExportVDB): @@ -23,6 +27,15 @@ def make_parser(cls, subparsers): parser_weaviate.add_argument("--url", type=str, help="URL of Weaviate instance") parser_weaviate.add_argument("--api_key", type=str, help="Weaviate API key") + parser_weaviate.add_argument("--openai_api_key", type=str, help="Openai API key") + parser_weaviate.add_arguments( + "--batch_size", type=int, help="batch size for fetching", + default=1000 + ) + parser_weaviate.add_argument( + "--connection-type", type=str, choices=["local", "cloud"], default="cloud", + help="Type of connection to Weaviate (local or cloud)" + ) parser_weaviate.add_argument( "--classes", type=str, help="Classes to export (comma-separated)" ) @@ -35,6 +48,12 @@ def export_vdb(cls, args): "Enter the URL of Weaviate instance: ", str, ) + set_arg_from_input( + args, + "connection_type", + "Enter 'local' or 'cloud' for connection types: ", + choices=['local', 'cloud'], + ) set_arg_from_password( args, "api_key", @@ -55,14 +74,20 @@ def export_vdb(cls, args): weaviate_export.get_data() return weaviate_export - # Connect to a WCS instance + # Connect to a WCS or local instance def __init__(self, args): super().__init__(args) - self.client = weaviate.connect_to_wcs( - cluster_url=self.args["url"], - auth_credentials=weaviate.auth.AuthApiKey(self.args["api_key"]), - skip_init_checks=True, - ) + if self.args["connection_type"] == "local": + self.client = weaviate.connect_to_local() + else: + self.client = weaviate.connect_to_wcs( + cluster_url=self.args["url"], + auth_credentials=weaviate.auth.AuthApiKey(self.args["api_key"]), + headers={'X-OpenAI-Api-key': self.args["openai_api_key"]} + if self.args["openai_api_key"] + else None, + skip_init_checks=True, + ) def get_index_names(self): if self.args.get("classes") is None: @@ -76,14 +101,60 @@ def get_index_names(self): return [c for c in self.all_classes if c in input_classes] def get_data(self): - # Get all objects of a class + # Get the index names to export index_names = self.get_index_names() - for class_name in index_names: - collection = self.client.collections.get(class_name) + index_metas: Dict[str, List[NamespaceMeta]] = {} + + # Iterate over index names and fetch data + for index_name in index_names: + collection = self.client.collections.get(index_name) response = collection.aggregate.over_all(total_count=True) - print(f"{response.total_count=}") + total_vector_count = response.total_count + + # Create vectors directory for this index + vectors_directory = self.create_vec_dir(index_name) + + # Export data in batches + batch_size = self.args["batch_size"] + num_batches = (total_vector_count + batch_size - 1) // batch_size + num_vectors_exported = 0 + + for batch_idx in tqdm(range(num_batches), desc=f"Exporting {index_name}"): + offset = batch_idx * batch_size + objects = collection.objects.limit(batch_size).offset(offset).get() + + # Extract vectors and metadata + vectors = {obj.id: obj.vector for obj in objects} + metadata = {} + # Need a better way + for obj in objects: + metadata[obj.id] = {attr: getattr(obj, attr) for attr in dir(obj) if not attr.startswith("__")} + + + # Save vectors and metadata to Parquet file + num_vectors_exported += self.save_vectors_to_parquet( + vectors, metadata, vectors_directory + ) + + # Create NamespaceMeta for this index + namespace_metas = [ + self.get_namespace_meta( + index_name, + vectors_directory, + total=total_vector_count, + num_vectors_exported=num_vectors_exported, + dim=300, # Not sure of the dimensions + distance="Cosine", + ) + ] + index_metas[index_name] = namespace_metas + + # Write VDFMeta to JSON file + self.file_structure.append(os.path.join(self.vdf_directory, "VDF_META.json")) + internal_metadata = self.get_basic_vdf_meta(index_metas) + meta_text = json.dumps(internal_metadata.model_dump(), indent=4) + tqdm.write(meta_text) + + print("Data export complete.") - # objects = self.client.query.get( - # wvq.Objects(wvq.Class(class_name)).with_limit(1000) - # ) - # print(objects) + return True diff --git a/src/vdf_io/import_vdf/weaviate_import.py b/src/vdf_io/import_vdf/weaviate_import.py new file mode 100644 index 0000000..374b713 --- /dev/null +++ b/src/vdf_io/import_vdf/weaviate_import.py @@ -0,0 +1,139 @@ +import os +import weaviate +import json +from tqdm import tqdm +from vdf_io.import_vdf.vdf_import_cls import ImportVDB +from vdf_io.names import DBNames +from vdf_io.util import set_arg_from_input, set_arg_from_password + +# Set these environment variables +URL = os.getenv("YOUR_WCS_URL") +APIKEY = os.getenv("YOUR_WCS_API_KEY") + + +class ImportWeaviate(ImportVDB): + DB_NAME_SLUG = DBNames.WEAVIATE + + @classmethod + def make_parser(cls, subparsers): + parser_weaviate = subparsers.add_parser( + cls.DB_NAME_SLUG, help="Import data into Weaviate" + ) + + parser_weaviate.add_argument("--url", type=str, help="URL of Weaviate instance") + parser_weaviate.add_argument("--api_key", type=str, help="Weaviate API key") + parser_weaviate.add_argument( + "--index_name", type=str, help="Name of the index in Weaviate" + ) + + @classmethod + def import_vdb(cls, args): + set_arg_from_input( + args, + "url", + "Enter the URL of Weaviate instance: ", + str, + ) + set_arg_from_password( + args, + "api_key", + "Enter the Weaviate API key: ", + "WEAVIATE_API_KEY", + ) + set_arg_from_input( + args, + "index_name", + "Enter the name of the index in Weaviate: ", + str, + ) + weaviate_import = ImportWeaviate(args) + weaviate_import.upsert_data() + return weaviate_import + + def __init__(self, args): + super().__init__(args) + if self.args["connection_type"] == "local": + self.client = weaviate.connect_to_local() + else: + self.client = weaviate.connect_to_wcs( + cluster_url=self.args["url"], + auth_credentials=weaviate.auth.AuthApiKey(self.args["api_key"]), + headers={'X-OpenAI-Api-key': self.args["openai_api_key"]} + if self.args["openai_api_key"] + else None, + skip_init_checks=True, + ) + + def upsert_data(self): + max_hit = False + total_imported_count = 0 + + # Iterate over the indexes and import the data + for index_name, index_meta in tqdm(self.vdf_meta["indexes"].items(), desc="Importing indexes"): + tqdm.write(f"Importing data for index '{index_name}'") + for namespace_meta in index_meta: + self.set_dims(namespace_meta, index_name) + + # Create or get the index + index_name = self.create_new_name(index_name, self.client.collections.list_all().keys()) + index = self.client.collections.get(index_name) + + # Load data from the Parquet files + data_path = namespace_meta["data_path"] + final_data_path = self.get_final_data_path(data_path) + parquet_files = self.get_parquet_files(final_data_path) + + vectors = {} + metadata = {} + + # for file in tqdm(parquet_files, desc="Loading data from parquet files"): + # file_path = os.path.join(final_data_path, file) + # df = self.read_parquet_progress(file_path) + + # if len(vectors) > (self.args.get("max_num_rows") or INT_MAX): + # max_hit = True + # break + + # self.update_vectors(vectors, vector_column_name, df) + # self.update_metadata(metadata, vector_column_names, df) + # if max_hit: + # break + + # tqdm.write(f"Loaded {len(vectors)} vectors from {len(parquet_files)} parquet files") + + # # Upsert the vectors and metadata to the Weaviate index in batches + # BATCH_SIZE = self.args.get("batch_size", 1000) or 1000 + # current_batch_size = BATCH_SIZE + # start_idx = 0 + + # while start_idx < len(vectors): + # end_idx = min(start_idx + current_batch_size, len(vectors)) + + # batch_vectors = [ + # ( + # str(id), + # vector, + # { + # k: v + # for k, v in metadata.get(id, {}).items() + # if v is not None + # } if len(metadata.get(id, {}).keys()) > 0 else None + # ) + # for id, vector in list(vectors.items())[start_idx:end_idx] + # ] + + # try: + # resp = index.batch.create(batch_vectors) + # total_imported_count += len(batch_vectors) + # start_idx += len(batch_vectors) + # except Exception as e: + # tqdm.write(f"Error upserting vectors for index '{index_name}', {e}") + # if current_batch_size < BATCH_SIZE / 100: + # tqdm.write("Batch size is not the issue. Aborting import") + # raise e + # current_batch_size = int(2 * current_batch_size / 3) + # tqdm.write(f"Reducing batch size to {current_batch_size}") + # continue + + # tqdm.write(f"Data import completed successfully. Imported {total_imported_count} vectors") + # self.args["imported_count"] = total_imported_count \ No newline at end of file From fa5f40bdc36ce76f845952d0bbdf1eb9aa6ab944 Mon Sep 17 00:00:00 2001 From: emekaokoli19 Date: Mon, 6 May 2024 05:51:23 +0100 Subject: [PATCH 2/5] changes --- src/vdf_io/export_vdf/weaviate_export.py | 67 ++++++++----- src/vdf_io/import_vdf/weaviate_import.py | 119 ++++++++++++----------- 2 files changed, 102 insertions(+), 84 deletions(-) diff --git a/src/vdf_io/export_vdf/weaviate_export.py b/src/vdf_io/export_vdf/weaviate_export.py index 0072fcd..cb13da5 100644 --- a/src/vdf_io/export_vdf/weaviate_export.py +++ b/src/vdf_io/export_vdf/weaviate_export.py @@ -1,13 +1,14 @@ import os - -from tqdm import tqdm import weaviate import json +from tqdm import tqdm +from weaviate.classes.query import MetadataQuery from vdf_io.export_vdf.vdb_export_cls import ExportVDB from vdf_io.meta_types import NamespaceMeta from vdf_io.names import DBNames from vdf_io.util import set_arg_from_input, set_arg_from_password +from vdf_io.constants import DEFAULT_BATCH_SIZE from typing import Dict, List # Set these environment variables @@ -28,9 +29,13 @@ def make_parser(cls, subparsers): parser_weaviate.add_argument("--url", type=str, help="URL of Weaviate instance") parser_weaviate.add_argument("--api_key", type=str, help="Weaviate API key") parser_weaviate.add_argument("--openai_api_key", type=str, help="Openai API key") - parser_weaviate.add_arguments( + parser_weaviate.add_argument( "--batch_size", type=int, help="batch size for fetching", - default=1000 + default=DEFAULT_BATCH_SIZE + ) + parser_weaviate.add_argument( + "--offset", type=int, help="offset for fetching", + default=None ) parser_weaviate.add_argument( "--connection-type", type=str, choices=["local", "cloud"], default="cloud", @@ -100,39 +105,50 @@ def get_index_names(self): ) return [c for c in self.all_classes if c in input_classes] + def metadata_to_dict(self, metadata): + meta_data = {} + meta_data["creation_time"] = metadata.creation_time + meta_data["distance"] = metadata.distance + meta_data["certainty"] = metadata.certainty + meta_data["explain_score"] = metadata.explain_score + meta_data["is_consistent"] = metadata.is_consistent + meta_data["last_update_time"] = metadata.last_update_time + meta_data["rerank_score"] = metadata.rerank_score + meta_data["score"] = metadata.score + + return meta_data + def get_data(self): # Get the index names to export index_names = self.get_index_names() index_metas: Dict[str, List[NamespaceMeta]] = {} + # Export data in batches + batch_size = self.args["batch_size"] + offset = self.args["offset"] + # Iterate over index names and fetch data for index_name in index_names: collection = self.client.collections.get(index_name) - response = collection.aggregate.over_all(total_count=True) - total_vector_count = response.total_count + response = collection.query.fetch_objects( + limit=batch_size, + offset=offset, + include_vector=True, + return_metadata=MetadataQuery.full() + ) + res = collection.aggregate.over_all(total_count=True) + total_vector_count = res.total_count # Create vectors directory for this index vectors_directory = self.create_vec_dir(index_name) - # Export data in batches - batch_size = self.args["batch_size"] - num_batches = (total_vector_count + batch_size - 1) // batch_size - num_vectors_exported = 0 - - for batch_idx in tqdm(range(num_batches), desc=f"Exporting {index_name}"): - offset = batch_idx * batch_size - objects = collection.objects.limit(batch_size).offset(offset).get() - - # Extract vectors and metadata - vectors = {obj.id: obj.vector for obj in objects} - metadata = {} - # Need a better way - for obj in objects: - metadata[obj.id] = {attr: getattr(obj, attr) for attr in dir(obj) if not attr.startswith("__")} - + for obj in response.objects: + vectors = obj.vector + metadata = obj.metadata + metadata = self.metadata_to_dict(metadata=metadata) # Save vectors and metadata to Parquet file - num_vectors_exported += self.save_vectors_to_parquet( + num_vectors_exported = self.save_vectors_to_parquet( vectors, metadata, vectors_directory ) @@ -143,7 +159,7 @@ def get_data(self): vectors_directory, total=total_vector_count, num_vectors_exported=num_vectors_exported, - dim=300, # Not sure of the dimensions + dim=-1, distance="Cosine", ) ] @@ -154,7 +170,8 @@ def get_data(self): internal_metadata = self.get_basic_vdf_meta(index_metas) meta_text = json.dumps(internal_metadata.model_dump(), indent=4) tqdm.write(meta_text) - + with open(os.path.join(self.vdf_directory, "VDF_META.json"), "w") as json_file: + json_file.write(meta_text) print("Data export complete.") return True diff --git a/src/vdf_io/import_vdf/weaviate_import.py b/src/vdf_io/import_vdf/weaviate_import.py index 374b713..b62ac2a 100644 --- a/src/vdf_io/import_vdf/weaviate_import.py +++ b/src/vdf_io/import_vdf/weaviate_import.py @@ -1,10 +1,10 @@ import os import weaviate -import json from tqdm import tqdm from vdf_io.import_vdf.vdf_import_cls import ImportVDB from vdf_io.names import DBNames from vdf_io.util import set_arg_from_input, set_arg_from_password +from vdf_io.constants import INT_MAX, DEFAULT_BATCH_SIZE # Set these environment variables URL = os.getenv("YOUR_WCS_URL") @@ -25,6 +25,14 @@ def make_parser(cls, subparsers): parser_weaviate.add_argument( "--index_name", type=str, help="Name of the index in Weaviate" ) + parser_weaviate.add_argument( + "--connection-type", type=str, choices=["local", "cloud"], default="cloud", + help="Type of connection to Weaviate (local or cloud)" + ) + parser_weaviate.add_argument( + "--batch_size", type=int, help="batch size for fetching", + default=DEFAULT_BATCH_SIZE + ) @classmethod def import_vdb(cls, args): @@ -34,18 +42,24 @@ def import_vdb(cls, args): "Enter the URL of Weaviate instance: ", str, ) - set_arg_from_password( - args, - "api_key", - "Enter the Weaviate API key: ", - "WEAVIATE_API_KEY", - ) set_arg_from_input( args, "index_name", "Enter the name of the index in Weaviate: ", str, ) + set_arg_from_input( + args, + "connection_type", + "Enter 'local' or 'cloud' for connection types: ", + choices=['local', 'cloud'], + ) + set_arg_from_password( + args, + "api_key", + "Enter the Weaviate API key: ", + "WEAVIATE_API_KEY", + ) weaviate_import = ImportWeaviate(args) weaviate_import.upsert_data() return weaviate_import @@ -76,7 +90,6 @@ def upsert_data(self): # Create or get the index index_name = self.create_new_name(index_name, self.client.collections.list_all().keys()) - index = self.client.collections.get(index_name) # Load data from the Parquet files data_path = namespace_meta["data_path"] @@ -85,55 +98,43 @@ def upsert_data(self): vectors = {} metadata = {} + vector_column_names, vector_column_name = self.get_vector_column_name( + index_name, namespace_meta + ) - # for file in tqdm(parquet_files, desc="Loading data from parquet files"): - # file_path = os.path.join(final_data_path, file) - # df = self.read_parquet_progress(file_path) - - # if len(vectors) > (self.args.get("max_num_rows") or INT_MAX): - # max_hit = True - # break - - # self.update_vectors(vectors, vector_column_name, df) - # self.update_metadata(metadata, vector_column_names, df) - # if max_hit: - # break - - # tqdm.write(f"Loaded {len(vectors)} vectors from {len(parquet_files)} parquet files") - - # # Upsert the vectors and metadata to the Weaviate index in batches - # BATCH_SIZE = self.args.get("batch_size", 1000) or 1000 - # current_batch_size = BATCH_SIZE - # start_idx = 0 - - # while start_idx < len(vectors): - # end_idx = min(start_idx + current_batch_size, len(vectors)) - - # batch_vectors = [ - # ( - # str(id), - # vector, - # { - # k: v - # for k, v in metadata.get(id, {}).items() - # if v is not None - # } if len(metadata.get(id, {}).keys()) > 0 else None - # ) - # for id, vector in list(vectors.items())[start_idx:end_idx] - # ] - - # try: - # resp = index.batch.create(batch_vectors) - # total_imported_count += len(batch_vectors) - # start_idx += len(batch_vectors) - # except Exception as e: - # tqdm.write(f"Error upserting vectors for index '{index_name}', {e}") - # if current_batch_size < BATCH_SIZE / 100: - # tqdm.write("Batch size is not the issue. Aborting import") - # raise e - # current_batch_size = int(2 * current_batch_size / 3) - # tqdm.write(f"Reducing batch size to {current_batch_size}") - # continue - - # tqdm.write(f"Data import completed successfully. Imported {total_imported_count} vectors") - # self.args["imported_count"] = total_imported_count \ No newline at end of file + for file in tqdm(parquet_files, desc="Loading data from parquet files"): + file_path = os.path.join(final_data_path, file) + df = self.read_parquet_progress(file_path) + + if len(vectors) > (self.args.get("max_num_rows") or INT_MAX): + max_hit = True + break + if len(vectors) + len(df) > ( + self.args.get("max_num_rows") or INT_MAX + ): + df = df.head( + (self.args.get("max_num_rows") or INT_MAX) - len(vectors) + ) + max_hit = True + self.update_vectors(vectors, vector_column_name, df) + self.update_metadata(metadata, vector_column_names, df) + if max_hit: + break + + tqdm.write(f"Loaded {len(vectors)} vectors from {len(parquet_files)} parquet files") + + # Upsert the vectors and metadata to the Weaviate index in batches + BATCH_SIZE = self.args.get("batch_size") + + with self.client.batch.fixed_size(batch_size=BATCH_SIZE) as batch: + for _, vector in vectors.items(): + batch.add_object( + vector=vector, + collection=index_name + #TODO: Find way to add Metadata + ) + total_imported_count += 1 + + + tqdm.write(f"Data import completed successfully. Imported {total_imported_count} vectors") + self.args["imported_count"] = total_imported_count \ No newline at end of file From 4e18e486a8122a6a47582e6815ab2c3aebef449c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 6 May 2024 04:55:25 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/vdf_io/export_vdf/weaviate_export.py | 26 ++++++++----- src/vdf_io/import_vdf/weaviate_import.py | 48 ++++++++++++++---------- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/src/vdf_io/export_vdf/weaviate_export.py b/src/vdf_io/export_vdf/weaviate_export.py index cb13da5..52048e4 100644 --- a/src/vdf_io/export_vdf/weaviate_export.py +++ b/src/vdf_io/export_vdf/weaviate_export.py @@ -28,18 +28,24 @@ def make_parser(cls, subparsers): parser_weaviate.add_argument("--url", type=str, help="URL of Weaviate instance") parser_weaviate.add_argument("--api_key", type=str, help="Weaviate API key") - parser_weaviate.add_argument("--openai_api_key", type=str, help="Openai API key") parser_weaviate.add_argument( - "--batch_size", type=int, help="batch size for fetching", - default=DEFAULT_BATCH_SIZE + "--openai_api_key", type=str, help="Openai API key" ) parser_weaviate.add_argument( - "--offset", type=int, help="offset for fetching", - default=None + "--batch_size", + type=int, + help="batch size for fetching", + default=DEFAULT_BATCH_SIZE, ) parser_weaviate.add_argument( - "--connection-type", type=str, choices=["local", "cloud"], default="cloud", - help="Type of connection to Weaviate (local or cloud)" + "--offset", type=int, help="offset for fetching", default=None + ) + parser_weaviate.add_argument( + "--connection-type", + type=str, + choices=["local", "cloud"], + default="cloud", + help="Type of connection to Weaviate (local or cloud)", ) parser_weaviate.add_argument( "--classes", type=str, help="Classes to export (comma-separated)" @@ -57,7 +63,7 @@ def export_vdb(cls, args): args, "connection_type", "Enter 'local' or 'cloud' for connection types: ", - choices=['local', 'cloud'], + choices=["local", "cloud"], ) set_arg_from_password( args, @@ -88,7 +94,7 @@ def __init__(self, args): self.client = weaviate.connect_to_wcs( cluster_url=self.args["url"], auth_credentials=weaviate.auth.AuthApiKey(self.args["api_key"]), - headers={'X-OpenAI-Api-key': self.args["openai_api_key"]} + headers={"X-OpenAI-Api-key": self.args["openai_api_key"]} if self.args["openai_api_key"] else None, skip_init_checks=True, @@ -134,7 +140,7 @@ def get_data(self): limit=batch_size, offset=offset, include_vector=True, - return_metadata=MetadataQuery.full() + return_metadata=MetadataQuery.full(), ) res = collection.aggregate.over_all(total_count=True) total_vector_count = res.total_count diff --git a/src/vdf_io/import_vdf/weaviate_import.py b/src/vdf_io/import_vdf/weaviate_import.py index b62ac2a..657c472 100644 --- a/src/vdf_io/import_vdf/weaviate_import.py +++ b/src/vdf_io/import_vdf/weaviate_import.py @@ -26,12 +26,17 @@ def make_parser(cls, subparsers): "--index_name", type=str, help="Name of the index in Weaviate" ) parser_weaviate.add_argument( - "--connection-type", type=str, choices=["local", "cloud"], default="cloud", - help="Type of connection to Weaviate (local or cloud)" + "--connection-type", + type=str, + choices=["local", "cloud"], + default="cloud", + help="Type of connection to Weaviate (local or cloud)", ) parser_weaviate.add_argument( - "--batch_size", type=int, help="batch size for fetching", - default=DEFAULT_BATCH_SIZE + "--batch_size", + type=int, + help="batch size for fetching", + default=DEFAULT_BATCH_SIZE, ) @classmethod @@ -52,7 +57,7 @@ def import_vdb(cls, args): args, "connection_type", "Enter 'local' or 'cloud' for connection types: ", - choices=['local', 'cloud'], + choices=["local", "cloud"], ) set_arg_from_password( args, @@ -72,7 +77,7 @@ def __init__(self, args): self.client = weaviate.connect_to_wcs( cluster_url=self.args["url"], auth_credentials=weaviate.auth.AuthApiKey(self.args["api_key"]), - headers={'X-OpenAI-Api-key': self.args["openai_api_key"]} + headers={"X-OpenAI-Api-key": self.args["openai_api_key"]} if self.args["openai_api_key"] else None, skip_init_checks=True, @@ -83,13 +88,17 @@ def upsert_data(self): total_imported_count = 0 # Iterate over the indexes and import the data - for index_name, index_meta in tqdm(self.vdf_meta["indexes"].items(), desc="Importing indexes"): + for index_name, index_meta in tqdm( + self.vdf_meta["indexes"].items(), desc="Importing indexes" + ): tqdm.write(f"Importing data for index '{index_name}'") for namespace_meta in index_meta: self.set_dims(namespace_meta, index_name) # Create or get the index - index_name = self.create_new_name(index_name, self.client.collections.list_all().keys()) + index_name = self.create_new_name( + index_name, self.client.collections.list_all().keys() + ) # Load data from the Parquet files data_path = namespace_meta["data_path"] @@ -107,11 +116,9 @@ def upsert_data(self): df = self.read_parquet_progress(file_path) if len(vectors) > (self.args.get("max_num_rows") or INT_MAX): - max_hit = True - break - if len(vectors) + len(df) > ( - self.args.get("max_num_rows") or INT_MAX - ): + max_hit = True + break + if len(vectors) + len(df) > (self.args.get("max_num_rows") or INT_MAX): df = df.head( (self.args.get("max_num_rows") or INT_MAX) - len(vectors) ) @@ -121,7 +128,9 @@ def upsert_data(self): if max_hit: break - tqdm.write(f"Loaded {len(vectors)} vectors from {len(parquet_files)} parquet files") + tqdm.write( + f"Loaded {len(vectors)} vectors from {len(parquet_files)} parquet files" + ) # Upsert the vectors and metadata to the Weaviate index in batches BATCH_SIZE = self.args.get("batch_size") @@ -130,11 +139,12 @@ def upsert_data(self): for _, vector in vectors.items(): batch.add_object( vector=vector, - collection=index_name - #TODO: Find way to add Metadata + collection=index_name, + # TODO: Find way to add Metadata ) total_imported_count += 1 - - tqdm.write(f"Data import completed successfully. Imported {total_imported_count} vectors") - self.args["imported_count"] = total_imported_count \ No newline at end of file + tqdm.write( + f"Data import completed successfully. Imported {total_imported_count} vectors" + ) + self.args["imported_count"] = total_imported_count From ad6920c118ec88540060a3abbec4850824c842f6 Mon Sep 17 00:00:00 2001 From: dhruv-anand-aintech Date: Wed, 15 May 2024 17:13:14 +0530 Subject: [PATCH 4/5] fixing some things --- src/vdf_io/export_vdf/weaviate_export.py | 32 +++++------------------ src/vdf_io/import_vdf/weaviate_import.py | 33 +++--------------------- src/vdf_io/weaviate_util.py | 31 ++++++++++++++++++++++ 3 files changed, 40 insertions(+), 56 deletions(-) create mode 100644 src/vdf_io/weaviate_util.py diff --git a/src/vdf_io/export_vdf/weaviate_export.py b/src/vdf_io/export_vdf/weaviate_export.py index 52048e4..b71d10a 100644 --- a/src/vdf_io/export_vdf/weaviate_export.py +++ b/src/vdf_io/export_vdf/weaviate_export.py @@ -1,20 +1,17 @@ import os import weaviate import json - +from typing import Dict, List from tqdm import tqdm + from weaviate.classes.query import MetadataQuery + from vdf_io.export_vdf.vdb_export_cls import ExportVDB from vdf_io.meta_types import NamespaceMeta from vdf_io.names import DBNames -from vdf_io.util import set_arg_from_input, set_arg_from_password +from vdf_io.util import set_arg_from_input from vdf_io.constants import DEFAULT_BATCH_SIZE -from typing import Dict, List - -# Set these environment variables -URL = os.getenv("YOUR_WCS_URL") -APIKEY = os.getenv("YOUR_WCS_API_KEY") -OPENAI_APIKEY = os.getenv("OPENAI_APIKEY") +from vdf_io.weaviate_util import prompt_for_creds class ExportWeaviate(ExportVDB): @@ -53,24 +50,7 @@ def make_parser(cls, subparsers): @classmethod def export_vdb(cls, args): - set_arg_from_input( - args, - "url", - "Enter the URL of Weaviate instance: ", - str, - ) - set_arg_from_input( - args, - "connection_type", - "Enter 'local' or 'cloud' for connection types: ", - choices=["local", "cloud"], - ) - set_arg_from_password( - args, - "api_key", - "Enter the Weaviate API key: ", - "WEAVIATE_API_KEY", - ) + prompt_for_creds(args) weaviate_export = ExportWeaviate(args) weaviate_export.all_classes = list( weaviate_export.client.collections.list_all().keys() diff --git a/src/vdf_io/import_vdf/weaviate_import.py b/src/vdf_io/import_vdf/weaviate_import.py index 657c472..7d9e150 100644 --- a/src/vdf_io/import_vdf/weaviate_import.py +++ b/src/vdf_io/import_vdf/weaviate_import.py @@ -5,6 +5,7 @@ from vdf_io.names import DBNames from vdf_io.util import set_arg_from_input, set_arg_from_password from vdf_io.constants import INT_MAX, DEFAULT_BATCH_SIZE +from vdf_io.weaviate_util import prompt_for_creds # Set these environment variables URL = os.getenv("YOUR_WCS_URL") @@ -22,9 +23,6 @@ def make_parser(cls, subparsers): parser_weaviate.add_argument("--url", type=str, help="URL of Weaviate instance") parser_weaviate.add_argument("--api_key", type=str, help="Weaviate API key") - parser_weaviate.add_argument( - "--index_name", type=str, help="Name of the index in Weaviate" - ) parser_weaviate.add_argument( "--connection-type", type=str, @@ -41,30 +39,7 @@ def make_parser(cls, subparsers): @classmethod def import_vdb(cls, args): - set_arg_from_input( - args, - "url", - "Enter the URL of Weaviate instance: ", - str, - ) - set_arg_from_input( - args, - "index_name", - "Enter the name of the index in Weaviate: ", - str, - ) - set_arg_from_input( - args, - "connection_type", - "Enter 'local' or 'cloud' for connection types: ", - choices=["local", "cloud"], - ) - set_arg_from_password( - args, - "api_key", - "Enter the Weaviate API key: ", - "WEAVIATE_API_KEY", - ) + prompt_for_creds(args) weaviate_import = ImportWeaviate(args) weaviate_import.upsert_data() return weaviate_import @@ -77,9 +52,7 @@ def __init__(self, args): self.client = weaviate.connect_to_wcs( cluster_url=self.args["url"], auth_credentials=weaviate.auth.AuthApiKey(self.args["api_key"]), - headers={"X-OpenAI-Api-key": self.args["openai_api_key"]} - if self.args["openai_api_key"] - else None, + headers={"X-OpenAI-Api-key": self.args.get("openai_api_key", "")}, skip_init_checks=True, ) diff --git a/src/vdf_io/weaviate_util.py b/src/vdf_io/weaviate_util.py new file mode 100644 index 0000000..827b324 --- /dev/null +++ b/src/vdf_io/weaviate_util.py @@ -0,0 +1,31 @@ +from vdf_io.util import set_arg_from_input, set_arg_from_password + + +def prompt_for_creds(args): + set_arg_from_input( + args, + "connection_type", + "Enter 'local' or 'cloud' for connection types: ", + choices=["local", "cloud"], + ) + if args["connection_type"] == "cloud": + set_arg_from_input( + args, + "url", + "Enter the URL of Weaviate instance: ", + str, + env_var="WEAVIATE_URL", + ) + set_arg_from_password( + args, + "api_key", + "Enter the Weaviate API key: ", + "WEAVIATE_API_KEY", + ) + + set_arg_from_password( + args, + "api_key", + "Enter the Weaviate API key: ", + "WEAVIATE_API_KEY", + ) From 8c311b79c66eede29cb7a7de41419f8685449b19 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 15 May 2024 11:45:16 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/vdf_io/import_vdf/weaviate_import.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/vdf_io/import_vdf/weaviate_import.py b/src/vdf_io/import_vdf/weaviate_import.py index 7d9e150..f28befc 100644 --- a/src/vdf_io/import_vdf/weaviate_import.py +++ b/src/vdf_io/import_vdf/weaviate_import.py @@ -3,7 +3,6 @@ from tqdm import tqdm from vdf_io.import_vdf.vdf_import_cls import ImportVDB from vdf_io.names import DBNames -from vdf_io.util import set_arg_from_input, set_arg_from_password from vdf_io.constants import INT_MAX, DEFAULT_BATCH_SIZE from vdf_io.weaviate_util import prompt_for_creds