forked from qdrant/vector-db-benchmark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_factory.py
93 lines (81 loc) · 3.01 KB
/
client_factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from abc import ABC
from typing import List, Type
from engine.base_client.client import (
BaseClient,
BaseConfigurator,
BaseSearcher,
BaseUploader,
)
from engine.clients.elasticsearch.configure import ElasticConfigurator
from engine.clients.elasticsearch.search import ElasticSearcher
from engine.clients.elasticsearch.upload import ElasticUploader
from engine.clients.milvus import MilvusConfigurator, MilvusSearcher, MilvusUploader
from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader
from engine.clients.redis.configure import RedisConfigurator
from engine.clients.redis.search import RedisSearcher
from engine.clients.redis.upload import RedisUploader
from engine.clients.weaviate import (
WeaviateConfigurator,
WeaviateSearcher,
WeaviateUploader,
)
ENGINE_CONFIGURATORS = {
"qdrant": QdrantConfigurator,
"weaviate": WeaviateConfigurator,
"milvus": MilvusConfigurator,
"elastic": ElasticConfigurator,
"redis": RedisConfigurator,
}
ENGINE_UPLOADERS = {
"qdrant": QdrantUploader,
"weaviate": WeaviateUploader,
"milvus": MilvusUploader,
"elastic": ElasticUploader,
"redis": RedisUploader,
}
ENGINE_SEARCHERS = {
"qdrant": QdrantSearcher,
"weaviate": WeaviateSearcher,
"milvus": MilvusSearcher,
"elastic": ElasticSearcher,
"redis": RedisSearcher,
}
class ClientFactory(ABC):
def __init__(self, host):
self.host = host
def _create_configurator(self, experiment) -> BaseConfigurator:
engine_configurator_class = ENGINE_CONFIGURATORS[experiment["engine"]]
engine_configurator = engine_configurator_class(
self.host,
collection_params={**experiment.get("collection_params", {})},
connection_params={**experiment.get("connection_params", {})},
)
return engine_configurator
def _create_uploader(self, experiment) -> BaseUploader:
engine_uploader_class = ENGINE_UPLOADERS[experiment["engine"]]
engine_uploader = engine_uploader_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
upload_params={**experiment.get("upload_params", {})},
)
return engine_uploader
def _create_searchers(self, experiment) -> List[BaseSearcher]:
engine_searcher_class: Type[BaseSearcher] = ENGINE_SEARCHERS[
experiment["engine"]
]
engine_searchers = [
engine_searcher_class(
self.host,
connection_params={**experiment.get("connection_params", {})},
search_params=search_params,
)
for search_params in experiment.get("search_params", [{}])
]
return engine_searchers
def build_client(self, experiment):
return BaseClient(
name=experiment["name"],
configurator=self._create_configurator(experiment),
uploader=self._create_uploader(experiment),
searchers=self._create_searchers(experiment),
)