Skip to content

Commit 51292de

Browse files
committed
Merge branch 'master' of github.com:mongodb/mongo-python-driver
2 parents 8685f59 + 8675a16 commit 51292de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2022
-841
lines changed

doc/changelog.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ PyMongo 4.12 brings a number of changes including:
99
- Support for configuring DEK cache lifetime via the ``key_expiration_ms`` argument to
1010
:class:`~pymongo.encryption_options.AutoEncryptionOpts`.
1111
- Support for $lookup in CSFLE and QE supported on MongoDB 8.1+.
12+
- Added :meth:`gridfs.asynchronous.grid_file.AsyncGridFSBucket.delete_by_name` and :meth:`gridfs.grid_file.GridFSBucket.delete_by_name`
13+
for more performant deletion of a file with multiple revisions.
1214
- AsyncMongoClient no longer performs DNS resolution for "mongodb+srv://" connection strings on creation.
1315
To avoid blocking the asyncio loop, the resolution is now deferred until the client is first connected.
1416
- Added index hinting support to the

gridfs/asynchronous/grid_file.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -834,6 +834,35 @@ async def delete(self, file_id: Any, session: Optional[AsyncClientSession] = Non
834834
if not res.deleted_count:
835835
raise NoFile("no file could be deleted because none matched %s" % file_id)
836836

837+
@_csot.apply
838+
async def delete_by_name(
839+
self, filename: str, session: Optional[AsyncClientSession] = None
840+
) -> None:
841+
"""Given a filename, delete this stored file's files collection document(s)
842+
and associated chunks from a GridFS bucket.
843+
844+
For example::
845+
846+
my_db = AsyncMongoClient().test
847+
fs = AsyncGridFSBucket(my_db)
848+
await fs.upload_from_stream("test_file", "data I want to store!")
849+
await fs.delete_by_name("test_file")
850+
851+
Raises :exc:`~gridfs.errors.NoFile` if no file with the given filename exists.
852+
853+
:param filename: The name of the file to be deleted.
854+
:param session: a :class:`~pymongo.client_session.AsyncClientSession`
855+
856+
.. versionadded:: 4.12
857+
"""
858+
_disallow_transactions(session)
859+
files = self._files.find({"filename": filename}, {"_id": 1}, session=session)
860+
file_ids = [file["_id"] async for file in files]
861+
res = await self._files.delete_many({"_id": {"$in": file_ids}}, session=session)
862+
await self._chunks.delete_many({"files_id": {"$in": file_ids}}, session=session)
863+
if not res.deleted_count:
864+
raise NoFile(f"no file could be deleted because none matched filename {filename!r}")
865+
837866
def find(self, *args: Any, **kwargs: Any) -> AsyncGridOutCursor:
838867
"""Find and return the files collection documents that match ``filter``
839868

gridfs/synchronous/grid_file.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -830,6 +830,33 @@ def delete(self, file_id: Any, session: Optional[ClientSession] = None) -> None:
830830
if not res.deleted_count:
831831
raise NoFile("no file could be deleted because none matched %s" % file_id)
832832

833+
@_csot.apply
834+
def delete_by_name(self, filename: str, session: Optional[ClientSession] = None) -> None:
835+
"""Given a filename, delete this stored file's files collection document(s)
836+
and associated chunks from a GridFS bucket.
837+
838+
For example::
839+
840+
my_db = MongoClient().test
841+
fs = GridFSBucket(my_db)
842+
fs.upload_from_stream("test_file", "data I want to store!")
843+
fs.delete_by_name("test_file")
844+
845+
Raises :exc:`~gridfs.errors.NoFile` if no file with the given filename exists.
846+
847+
:param filename: The name of the file to be deleted.
848+
:param session: a :class:`~pymongo.client_session.ClientSession`
849+
850+
.. versionadded:: 4.12
851+
"""
852+
_disallow_transactions(session)
853+
files = self._files.find({"filename": filename}, {"_id": 1}, session=session)
854+
file_ids = [file["_id"] for file in files]
855+
res = self._files.delete_many({"_id": {"$in": file_ids}}, session=session)
856+
self._chunks.delete_many({"files_id": {"$in": file_ids}}, session=session)
857+
if not res.deleted_count:
858+
raise NoFile(f"no file could be deleted because none matched filename {filename!r}")
859+
833860
def find(self, *args: Any, **kwargs: Any) -> GridOutCursor:
834861
"""Find and return the files collection documents that match ``filter``
835862

pymongo/asynchronous/bulk.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def __init__(
8787
self,
8888
collection: AsyncCollection[_DocumentType],
8989
ordered: bool,
90-
bypass_document_validation: bool,
90+
bypass_document_validation: Optional[bool],
9191
comment: Optional[str] = None,
9292
let: Optional[Any] = None,
9393
) -> None:
@@ -516,8 +516,8 @@ async def _execute_command(
516516
if self.comment:
517517
cmd["comment"] = self.comment
518518
_csot.apply_write_concern(cmd, write_concern)
519-
if self.bypass_doc_val:
520-
cmd["bypassDocumentValidation"] = True
519+
if self.bypass_doc_val is not None:
520+
cmd["bypassDocumentValidation"] = self.bypass_doc_val
521521
if self.let is not None and run.op_type in (_DELETE, _UPDATE):
522522
cmd["let"] = self.let
523523
if session:

pymongo/asynchronous/collection.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -701,7 +701,7 @@ async def bulk_write(
701701
self,
702702
requests: Sequence[_WriteOp[_DocumentType]],
703703
ordered: bool = True,
704-
bypass_document_validation: bool = False,
704+
bypass_document_validation: Optional[bool] = None,
705705
session: Optional[AsyncClientSession] = None,
706706
comment: Optional[Any] = None,
707707
let: Optional[Mapping] = None,
@@ -800,7 +800,7 @@ async def _insert_one(
800800
ordered: bool,
801801
write_concern: WriteConcern,
802802
op_id: Optional[int],
803-
bypass_doc_val: bool,
803+
bypass_doc_val: Optional[bool],
804804
session: Optional[AsyncClientSession],
805805
comment: Optional[Any] = None,
806806
) -> Any:
@@ -814,8 +814,8 @@ async def _insert_one(
814814
async def _insert_command(
815815
session: Optional[AsyncClientSession], conn: AsyncConnection, retryable_write: bool
816816
) -> None:
817-
if bypass_doc_val:
818-
command["bypassDocumentValidation"] = True
817+
if bypass_doc_val is not None:
818+
command["bypassDocumentValidation"] = bypass_doc_val
819819

820820
result = await conn.command(
821821
self._database.name,
@@ -840,7 +840,7 @@ async def _insert_command(
840840
async def insert_one(
841841
self,
842842
document: Union[_DocumentType, RawBSONDocument],
843-
bypass_document_validation: bool = False,
843+
bypass_document_validation: Optional[bool] = None,
844844
session: Optional[AsyncClientSession] = None,
845845
comment: Optional[Any] = None,
846846
) -> InsertOneResult:
@@ -906,7 +906,7 @@ async def insert_many(
906906
self,
907907
documents: Iterable[Union[_DocumentType, RawBSONDocument]],
908908
ordered: bool = True,
909-
bypass_document_validation: bool = False,
909+
bypass_document_validation: Optional[bool] = None,
910910
session: Optional[AsyncClientSession] = None,
911911
comment: Optional[Any] = None,
912912
) -> InsertManyResult:
@@ -986,7 +986,7 @@ async def _update(
986986
write_concern: Optional[WriteConcern] = None,
987987
op_id: Optional[int] = None,
988988
ordered: bool = True,
989-
bypass_doc_val: Optional[bool] = False,
989+
bypass_doc_val: Optional[bool] = None,
990990
collation: Optional[_CollationIn] = None,
991991
array_filters: Optional[Sequence[Mapping[str, Any]]] = None,
992992
hint: Optional[_IndexKeyHint] = None,
@@ -1041,8 +1041,8 @@ async def _update(
10411041
if comment is not None:
10421042
command["comment"] = comment
10431043
# Update command.
1044-
if bypass_doc_val:
1045-
command["bypassDocumentValidation"] = True
1044+
if bypass_doc_val is not None:
1045+
command["bypassDocumentValidation"] = bypass_doc_val
10461046

10471047
# The command result has to be published for APM unmodified
10481048
# so we make a shallow copy here before adding updatedExisting.
@@ -1082,7 +1082,7 @@ async def _update_retryable(
10821082
write_concern: Optional[WriteConcern] = None,
10831083
op_id: Optional[int] = None,
10841084
ordered: bool = True,
1085-
bypass_doc_val: Optional[bool] = False,
1085+
bypass_doc_val: Optional[bool] = None,
10861086
collation: Optional[_CollationIn] = None,
10871087
array_filters: Optional[Sequence[Mapping[str, Any]]] = None,
10881088
hint: Optional[_IndexKeyHint] = None,
@@ -1128,7 +1128,7 @@ async def replace_one(
11281128
filter: Mapping[str, Any],
11291129
replacement: Mapping[str, Any],
11301130
upsert: bool = False,
1131-
bypass_document_validation: bool = False,
1131+
bypass_document_validation: Optional[bool] = None,
11321132
collation: Optional[_CollationIn] = None,
11331133
hint: Optional[_IndexKeyHint] = None,
11341134
session: Optional[AsyncClientSession] = None,
@@ -1237,7 +1237,7 @@ async def update_one(
12371237
filter: Mapping[str, Any],
12381238
update: Union[Mapping[str, Any], _Pipeline],
12391239
upsert: bool = False,
1240-
bypass_document_validation: bool = False,
1240+
bypass_document_validation: Optional[bool] = None,
12411241
collation: Optional[_CollationIn] = None,
12421242
array_filters: Optional[Sequence[Mapping[str, Any]]] = None,
12431243
hint: Optional[_IndexKeyHint] = None,
@@ -2948,6 +2948,7 @@ async def aggregate(
29482948
returning aggregate results using a cursor.
29492949
- `collation` (optional): An instance of
29502950
:class:`~pymongo.collation.Collation`.
2951+
- `bypassDocumentValidation` (bool): If ``True``, allows the write to opt-out of document level validation.
29512952
29522953
29532954
:return: A :class:`~pymongo.asynchronous.command_cursor.AsyncCommandCursor` over the result

pymongo/asynchronous/encryption.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,6 @@
6464
from pymongo.asynchronous.cursor import AsyncCursor
6565
from pymongo.asynchronous.database import AsyncDatabase
6666
from pymongo.asynchronous.mongo_client import AsyncMongoClient
67-
from pymongo.asynchronous.pool import (
68-
_configured_socket,
69-
_get_timeout_details,
70-
_raise_connection_failure,
71-
)
7267
from pymongo.common import CONNECT_TIMEOUT
7368
from pymongo.daemon import _spawn_daemon
7469
from pymongo.encryption_options import AutoEncryptionOpts, RangeOpts
@@ -80,12 +75,17 @@
8075
NetworkTimeout,
8176
ServerSelectionTimeoutError,
8277
)
83-
from pymongo.network_layer import BLOCKING_IO_ERRORS, async_sendall
78+
from pymongo.network_layer import async_socket_sendall
8479
from pymongo.operations import UpdateOne
8580
from pymongo.pool_options import PoolOptions
81+
from pymongo.pool_shared import (
82+
_async_configured_socket,
83+
_get_timeout_details,
84+
_raise_connection_failure,
85+
)
8686
from pymongo.read_concern import ReadConcern
8787
from pymongo.results import BulkWriteResult, DeleteResult
88-
from pymongo.ssl_support import get_ssl_context
88+
from pymongo.ssl_support import BLOCKING_IO_ERRORS, get_ssl_context
8989
from pymongo.typings import _DocumentType, _DocumentTypeArg
9090
from pymongo.uri_parser_shared import parse_host
9191
from pymongo.write_concern import WriteConcern
@@ -113,7 +113,7 @@
113113

114114
async def _connect_kms(address: _Address, opts: PoolOptions) -> Union[socket.socket, _sslConn]:
115115
try:
116-
return await _configured_socket(address, opts)
116+
return await _async_configured_socket(address, opts)
117117
except Exception as exc:
118118
_raise_connection_failure(address, exc, timeout_details=_get_timeout_details(opts))
119119

@@ -196,7 +196,7 @@ async def kms_request(self, kms_context: MongoCryptKmsContext) -> None:
196196
try:
197197
conn = await _connect_kms(address, opts)
198198
try:
199-
await async_sendall(conn, message)
199+
await async_socket_sendall(conn, message)
200200
while kms_context.bytes_needed > 0:
201201
# CSOT: update timeout.
202202
conn.settimeout(max(_csot.clamp_remaining(_KMS_CONNECT_TIMEOUT), 0))

pymongo/asynchronous/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2079,7 +2079,7 @@ async def _cleanup_cursor_lock(
20792079
# exhausted the result set we *must* close the socket
20802080
# to stop the server from sending more data.
20812081
assert conn_mgr.conn is not None
2082-
conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
2082+
await conn_mgr.conn.close_conn(ConnectionClosedReason.ERROR)
20832083
else:
20842084
await self._close_cursor_now(cursor_id, address, session=session, conn_mgr=conn_mgr)
20852085
if conn_mgr:

pymongo/asynchronous/monitor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@
3636
from pymongo.server_description import ServerDescription
3737

3838
if TYPE_CHECKING:
39-
from pymongo.asynchronous.pool import AsyncConnection, Pool, _CancellationContext
39+
from pymongo.asynchronous.pool import ( # type: ignore[attr-defined]
40+
AsyncConnection,
41+
Pool,
42+
_CancellationContext,
43+
)
4044
from pymongo.asynchronous.settings import TopologySettings
4145
from pymongo.asynchronous.topology import Topology
4246

pymongo/asynchronous/network.py

Lines changed: 5 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import datetime
1919
import logging
20-
import time
2120
from typing import (
2221
TYPE_CHECKING,
2322
Any,
@@ -31,20 +30,16 @@
3130

3231
from bson import _decode_all_selective
3332
from pymongo import _csot, helpers_shared, message
34-
from pymongo.common import MAX_MESSAGE_SIZE
35-
from pymongo.compression_support import _NO_COMPRESSION, decompress
33+
from pymongo.compression_support import _NO_COMPRESSION
3634
from pymongo.errors import (
3735
NotPrimaryError,
3836
OperationFailure,
39-
ProtocolError,
4037
)
4138
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
42-
from pymongo.message import _UNPACK_REPLY, _OpMsg, _OpReply
39+
from pymongo.message import _OpMsg
4340
from pymongo.monitoring import _is_speculative_authenticate
4441
from pymongo.network_layer import (
45-
_UNPACK_COMPRESSION_HEADER,
46-
_UNPACK_HEADER,
47-
async_receive_data,
42+
async_receive_message,
4843
async_sendall,
4944
)
5045

@@ -194,13 +189,13 @@ async def command(
194189
)
195190

196191
try:
197-
await async_sendall(conn.conn, msg)
192+
await async_sendall(conn.conn.get_conn, msg)
198193
if use_op_msg and unacknowledged:
199194
# Unacknowledged, fake a successful command response.
200195
reply = None
201196
response_doc: _DocumentOut = {"ok": 1}
202197
else:
203-
reply = await receive_message(conn, request_id)
198+
reply = await async_receive_message(conn, request_id)
204199
conn.more_to_come = reply.more_to_come
205200
unpacked_docs = reply.unpack_response(
206201
codec_options=codec_options, user_fields=user_fields
@@ -301,47 +296,3 @@ async def command(
301296
)
302297

303298
return response_doc # type: ignore[return-value]
304-
305-
306-
async def receive_message(
307-
conn: AsyncConnection, request_id: Optional[int], max_message_size: int = MAX_MESSAGE_SIZE
308-
) -> Union[_OpReply, _OpMsg]:
309-
"""Receive a raw BSON message or raise socket.error."""
310-
if _csot.get_timeout():
311-
deadline = _csot.get_deadline()
312-
else:
313-
timeout = conn.conn.gettimeout()
314-
if timeout:
315-
deadline = time.monotonic() + timeout
316-
else:
317-
deadline = None
318-
# Ignore the response's request id.
319-
length, _, response_to, op_code = _UNPACK_HEADER(await async_receive_data(conn, 16, deadline))
320-
# No request_id for exhaust cursor "getMore".
321-
if request_id is not None:
322-
if request_id != response_to:
323-
raise ProtocolError(f"Got response id {response_to!r} but expected {request_id!r}")
324-
if length <= 16:
325-
raise ProtocolError(
326-
f"Message length ({length!r}) not longer than standard message header size (16)"
327-
)
328-
if length > max_message_size:
329-
raise ProtocolError(
330-
f"Message length ({length!r}) is larger than server max "
331-
f"message size ({max_message_size!r})"
332-
)
333-
if op_code == 2012:
334-
op_code, _, compressor_id = _UNPACK_COMPRESSION_HEADER(
335-
await async_receive_data(conn, 9, deadline)
336-
)
337-
data = decompress(await async_receive_data(conn, length - 25, deadline), compressor_id)
338-
else:
339-
data = await async_receive_data(conn, length - 16, deadline)
340-
341-
try:
342-
unpack_reply = _UNPACK_REPLY[op_code]
343-
except KeyError:
344-
raise ProtocolError(
345-
f"Got opcode {op_code!r} but expected {_UNPACK_REPLY.keys()!r}"
346-
) from None
347-
return unpack_reply(data)

0 commit comments

Comments
 (0)