Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(
self._sample_docs_in_coll_info = sample_docs_in_collection_info
self._indexes_in_coll_info = indexes_in_collection_info

_append_client_metadata(self._client)
_append_client_metadata(self._client, DRIVER_METADATA)

@classmethod
def from_connection_string(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(
if connection_string:
raise ValueError("Must provide connection_string or client, not both")
self.client = client
_append_client_metadata(self.client)
_append_client_metadata(self.client, DRIVER_METADATA)
elif connection_string:
try:
self.client = MongoClient(
Expand Down
10 changes: 6 additions & 4 deletions libs/langchain-mongodb/langchain_mongodb/docstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, collection: Collection, text_key: str = "page_content") -> No
self.collection = collection
self._text_key = text_key

_append_client_metadata(self.collection.database.client)
_append_client_metadata(self.collection.database.client, DRIVER_METADATA)

@classmethod
def from_connection_string(
Expand Down Expand Up @@ -99,12 +99,13 @@ def mset(
batch_size: Number of documents to insert at a time.
Tuning this may help with performance and sidestep MongoDB limits.
"""
keys, docs = zip(*key_value_pairs)
keys, docs = zip(*key_value_pairs, strict=False)
n_docs = len(docs)
start = 0
for end in range(batch_size, n_docs + batch_size, batch_size):
texts, metadatas = zip(
*[(doc.page_content, doc.metadata) for doc in docs[start:end]]
*[(doc.page_content, doc.metadata) for doc in docs[start:end]],
strict=False,
)
self.insert_many(texts=texts, metadatas=metadatas, ids=keys[start:end]) # type: ignore
start = end
Expand Down Expand Up @@ -149,6 +150,7 @@ def insert_many(
in the batch that do not have conflicting _ids will still be inserted.
"""
to_insert = [
{"_id": i, self._text_key: t, **m} for i, t, m in zip(ids, texts, metadatas)
{"_id": i, self._text_key: t, **m}
for i, t, m in zip(ids, texts, metadatas, strict=False)
]
self.collection.insert_many(to_insert) # type: ignore
2 changes: 1 addition & 1 deletion libs/langchain-mongodb/langchain_mongodb/graphrag/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def __init__(
self.collection = collection

# append_metadata was added in PyMongo 4.14.0, but is a valid database name on earlier versions
_append_client_metadata(collection.database.client)
_append_client_metadata(collection.database.client, DRIVER_METADATA)

self.entity_extraction_model = entity_extraction_model
self.entity_prompt = (
Expand Down
109 changes: 7 additions & 102 deletions libs/langchain-mongodb/langchain_mongodb/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
from pymongo.collection import Collection
from pymongo.operations import SearchIndexModel

# Don't break imports for modules that expect these functions
# to be in this module.
from pymongo_search_utils import ( # noqa: F401
create_vector_search_index,
update_vector_search_index,
)

logger = logging.getLogger(__file__)


Expand Down Expand Up @@ -34,60 +41,6 @@ def _vector_search_index_definition(
return definition


def create_vector_search_index(
collection: Collection,
index_name: str,
dimensions: int,
path: str,
similarity: str,
filters: Optional[List[str]] = None,
*,
wait_until_complete: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Experimental Utility function to create a vector search index

Args:
collection (Collection): MongoDB Collection
index_name (str): Name of Index
dimensions (int): Number of dimensions in embedding
path (str): field with vector embedding
similarity (str): The similarity score used for the index
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready.
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
"""
logger.info("Creating Search Index %s on %s", index_name, collection.name)

if collection.name not in collection.database.list_collection_names(
authorizedCollections=True
):
collection.database.create_collection(collection.name)

result = collection.create_search_index(
SearchIndexModel(
definition=_vector_search_index_definition(
dimensions=dimensions,
path=path,
similarity=similarity,
filters=filters,
**kwargs,
),
name=index_name,
type="vectorSearch",
)
)

if wait_until_complete:
_wait_for_predicate(
predicate=lambda: _is_index_ready(collection, index_name),
err=f"{index_name=} did not complete in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info(result)


def drop_vector_search_index(
collection: Collection,
index_name: str,
Expand Down Expand Up @@ -115,54 +68,6 @@ def drop_vector_search_index(
logger.info("Vector Search index %s.%s dropped", collection.name, index_name)


def update_vector_search_index(
collection: Collection,
index_name: str,
dimensions: int,
path: str,
similarity: str,
filters: Optional[List[str]] = None,
*,
wait_until_complete: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Update a search index.

Replace the existing index definition with the provided definition.

Args:
collection (Collection): MongoDB Collection
index_name (str): Name of Index
dimensions (int): Number of dimensions in embedding
path (str): field with vector embedding
similarity (str): The similarity score used for the index.
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready.
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
"""
logger.info(
"Updating Search Index %s from Collection: %s", index_name, collection.name
)
collection.update_search_index(
name=index_name,
definition=_vector_search_index_definition(
dimensions=dimensions,
path=path,
similarity=similarity,
filters=filters,
**kwargs,
),
)
if wait_until_complete:
_wait_for_predicate(
predicate=lambda: _is_index_ready(collection, index_name),
err=f"Index {index_name} update did not complete in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info("Update succeeded")


def _is_index_ready(collection: Collection, index_name: str) -> bool:
"""Check for the index name in the list of available search indexes to see if the
specified index is of status READY
Expand Down
4 changes: 2 additions & 2 deletions libs/langchain-mongodb/langchain_mongodb/indexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(self, collection: Collection) -> None:
super().__init__(namespace=namespace)
self._collection = collection

_append_client_metadata(self._collection.database.client)
_append_client_metadata(self._collection.database.client, DRIVER_METADATA)

@classmethod
def from_connection_string(
Expand Down Expand Up @@ -85,7 +85,7 @@ def update(
if len(keys) != len(group_ids):
raise ValueError("Number of keys does not match number of group_ids")

for key, group_id in zip(keys, group_ids):
for key, group_id in zip(keys, group_ids, strict=False):
self._collection.find_one_and_update(
{"namespace": self.namespace, "key": key},
{"$set": {"group_id": group_id, "updated_at": self.get_time()}},
Expand Down
2 changes: 1 addition & 1 deletion libs/langchain-mongodb/langchain_mongodb/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(
self.include_db_collection_in_metadata = include_db_collection_in_metadata

# append_metadata was added in PyMongo 4.14.0, but is a valid database name on earlier versions
_append_client_metadata(self.db.client)
_append_client_metadata(self.db.client, DRIVER_METADATA)

@classmethod
def from_connection_string(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from pymongo.collection import Collection

from langchain_mongodb.pipelines import text_search_stage
from langchain_mongodb.utils import _append_client_metadata, make_serializable
from langchain_mongodb.utils import (
DRIVER_METADATA,
_append_client_metadata,
make_serializable,
)


class MongoDBAtlasFullTextSearchRetriever(BaseRetriever):
Expand Down Expand Up @@ -64,7 +68,7 @@ def _get_relevant_documents(
)

if not self._added_metadata:
_append_client_metadata(self.collection.database.client)
_append_client_metadata(self.collection.database.client, DRIVER_METADATA)
self._added_metadata = True

# Execution
Expand Down
13 changes: 6 additions & 7 deletions libs/langchain-mongodb/langchain_mongodb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@
from typing import Any, Dict, List, Union

import numpy as np
from pymongo import MongoClient
from pymongo.driver_info import DriverInfo

# Don't break imports for modules that expect this function
# to be in this module.
from pymongo_search_utils import (
append_client_metadata as _append_client_metadata, # noqa: F401
)

logger = logging.getLogger(__name__)

Matrix = Union[List[List[float]], List[np.ndarray], np.ndarray]

DRIVER_METADATA = DriverInfo(name="Langchain", version=version("langchain-mongodb"))


def _append_client_metadata(client: MongoClient) -> None:
# append_metadata was added in PyMongo 4.14.0, but is a valid database name on earlier versions
if callable(client.append_metadata):
client.append_metadata(DRIVER_METADATA)


def cosine_similarity(X: Matrix, Y: Matrix) -> np.ndarray:
"""Row-wise cosine similarity between two equal-width matrices."""
if len(X) == 0 or len(Y) == 0:
Expand Down
53 changes: 11 additions & 42 deletions libs/langchain-mongodb/langchain_mongodb/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore
from pymongo import MongoClient, ReplaceOne
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.errors import CollectionInvalid
from pymongo_search_utils import bulk_embed_and_insert_texts

from langchain_mongodb.index import (
create_vector_search_index,
Expand Down Expand Up @@ -238,7 +239,7 @@ def __init__(
self._relevance_score_fn = relevance_score_fn

# append_metadata was added in PyMongo 4.14.0, but is a valid database name on earlier versions
_append_client_metadata(self._collection.database.client)
_append_client_metadata(self._collection.database.client, DRIVER_METADATA)

if auto_create_index is False:
return
Expand Down Expand Up @@ -356,17 +357,17 @@ def add_texts(
metadatas_batch = []
size = 0
i = 0
for j, (text, metadata) in enumerate(zip(texts, _metadatas)):
for j, (text, metadata) in enumerate(zip(texts, _metadatas, strict=False)):
size += len(text) + len(metadata)
texts_batch.append(text)
metadatas_batch.append(metadata)
if (j + 1) % batch_size == 0 or size >= 47_000_000:
if ids:
batch_res = self.bulk_embed_and_insert_texts(
batch_res = bulk_embed_and_insert_texts(
texts_batch, metadatas_batch, ids[i : j + 1]
)
Comment on lines +366 to 368
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external bulk_embed_and_insert_texts function is being called without the required self parameter that was present in the original method. This function needs to receive the embedding model, collection, and field configuration to work properly.

Copilot uses AI. Check for mistakes.

else:
batch_res = self.bulk_embed_and_insert_texts(
batch_res = bulk_embed_and_insert_texts(
texts_batch, metadatas_batch
)
Comment on lines +370 to 372
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external bulk_embed_and_insert_texts function is being called without the required self parameter that was present in the original method. This function needs to receive the embedding model, collection, and field configuration to work properly.

Copilot uses AI. Check for mistakes.

result_ids.extend(batch_res)
Expand All @@ -376,13 +377,11 @@ def add_texts(
i = j + 1
if texts_batch:
if ids:
batch_res = self.bulk_embed_and_insert_texts(
batch_res = bulk_embed_and_insert_texts(
texts_batch, metadatas_batch, ids[i : j + 1]
)
Comment on lines +380 to 382
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external bulk_embed_and_insert_texts function is being called without the required self parameter that was present in the original method. This function needs to receive the embedding model, collection, and field configuration to work properly.

Copilot uses AI. Check for mistakes.

else:
batch_res = self.bulk_embed_and_insert_texts(
texts_batch, metadatas_batch
)
batch_res = bulk_embed_and_insert_texts(texts_batch, metadatas_batch)
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external bulk_embed_and_insert_texts function is being called without the required self parameter that was present in the original method. This function needs to receive the embedding model, collection, and field configuration to work properly.

Copilot uses AI. Check for mistakes.

result_ids.extend(batch_res)
return result_ids

Expand Down Expand Up @@ -419,37 +418,6 @@ def get_by_ids(self, ids: Sequence[str], /) -> list[Document]:
docs.append(Document(page_content=text, id=oid_to_str(_id), metadata=doc))
return docs

def bulk_embed_and_insert_texts(
self,
texts: Union[List[str], Iterable[str]],
metadatas: Union[List[dict], Generator[dict, Any, Any]],
ids: Optional[List[str]] = None,
) -> List[str]:
"""Bulk insert single batch of texts, embeddings, and optionally ids.

See add_texts for additional details.
"""
if not texts:
return []
# Compute embedding vectors
embeddings = self._embedding.embed_documents(list(texts))
if not ids:
ids = [str(ObjectId()) for _ in range(len(list(texts)))]
docs = [
{
"_id": str_to_oid(i),
self._text_key: t,
self._embedding_key: embedding,
**m,
}
for i, t, m, embedding in zip(ids, texts, metadatas, embeddings)
]
operations = [ReplaceOne({"_id": doc["_id"]}, doc, upsert=True) for doc in docs]
# insert the documents in MongoDB Atlas
result = self._collection.bulk_write(operations)
assert result.upserted_ids is not None
return [oid_to_str(_id) for _id in result.upserted_ids.values()]

def add_documents(
self,
documents: List[Document],
Expand Down Expand Up @@ -478,10 +446,11 @@ def add_documents(
start = 0
for end in range(batch_size, n_docs + batch_size, batch_size):
texts, metadatas = zip(
*[(doc.page_content, doc.metadata) for doc in documents[start:end]]
*[(doc.page_content, doc.metadata) for doc in documents[start:end]],
strict=False,
)
result_ids.extend(
self.bulk_embed_and_insert_texts(
bulk_embed_and_insert_texts(
texts=texts, metadatas=metadatas, ids=ids[start:end]
Copy link

Copilot AI Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external bulk_embed_and_insert_texts function is being called without the required self parameter that was present in the original method. This function needs to receive the embedding model, collection, and field configuration to work properly.

Suggested change
texts=texts, metadatas=metadatas, ids=ids[start:end]
embedding_model=self._embedding,
collection=self._collection,
text_key=self._text_key,
embedding_key=self._embedding_key,
metadata_key=self._metadata_key,
texts=texts,
metadatas=metadatas,
ids=ids[start:end],

Copilot uses AI. Check for mistakes.

)
)
Expand Down
Loading
Loading