Skip to content

feat(NODE-6882): close outstanding connections #4499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from '../constants';
import {
type AnyError,
ConnectionPoolClosedError,
type MongoError,
MongoInvalidArgumentError,
MongoMissingCredentialsError,
Expand Down Expand Up @@ -489,6 +490,16 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
}

closeCheckedOutConnections() {
for (const conn of this.checkedOut) {
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.onError(new ConnectionPoolClosedError());
}
}

/** Close the pool */
close(): void {
if (this.closed) {
Expand Down
56 changes: 56 additions & 0 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,62 @@ export class MongoTopologyClosedError extends MongoAPIError {
}
}

/**
* An error generated when the MongoClient is closed and async
* operations are interrupted.
*
* @public
* @category Error
*/
export class MongoClientClosedError extends MongoAPIError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message = 'MongoClient is closed') {
super(message);
}

override get name(): string {
return 'MongoClientClosedError';
}
}

/**
* An error generated when a ConnectionPool is closed and async
* operations are interrupted.
*
* @public
* @category Error
*/
export class ConnectionPoolClosedError extends MongoAPIError {
/**
* **Do not use this constructor!**
*
* Meant for internal use only.
*
* @remarks
* This class is only meant to be constructed within the driver. This constructor is
* not subject to semantic versioning compatibility guarantees and may change at any time.
*
* @public
**/
constructor(message = 'ConnectionPool is closed') {
super(message);
}

override get name(): string {
return 'ConnectionPoolClosedError';
}
}

/** @public */
export interface MongoNetworkErrorOptions {
/** Indicates the timeout happened before a connection handshake completed */
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export {
export { ClientEncryption } from './client-side-encryption/client_encryption';
export { ChangeStreamCursor } from './cursor/change_stream_cursor';
export {
ConnectionPoolClosedError,
MongoAPIError,
MongoAWSError,
MongoAzureError,
Expand All @@ -53,6 +54,7 @@ export {
MongoClientBulkWriteCursorError,
MongoClientBulkWriteError,
MongoClientBulkWriteExecutionError,
MongoClientClosedError,
MongoCompatibilityError,
MongoCursorExhaustedError,
MongoCursorInUseError,
Expand Down
6 changes: 5 additions & 1 deletion src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,9 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
}

/**
* Cleans up client-side resources used by the MongoCLient and . This includes:
* Cleans up client-side resources used by the MongoClient.
*
* This includes:
*
* - Closes all open, unused connections (see note).
* - Ends all in-use sessions with {@link ClientSession#endSession|ClientSession.endSession()}.
Expand Down Expand Up @@ -672,6 +674,8 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
writable: false
});

this.topology?.closeCheckedOutConnections();

const activeCursorCloses = Array.from(this.s.activeCursors, cursor => cursor.close());
this.s.activeCursors.clear();

Expand Down
6 changes: 5 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}
}

closeCheckedOutConnections() {
return this.pool.closeCheckedOutConnections();
}

/** Destroy the server connection */
destroy(): void {
close(): void {
if (this.s.state === STATE_CLOSED) {
return;
}
Expand Down
14 changes: 10 additions & 4 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,20 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}
}

closeCheckedOutConnections() {
for (const server of this.s.servers.values()) {
return server.closeCheckedOutConnections();
}
}

/** Close this topology */
close(): void {
if (this.s.state === STATE_CLOSED || this.s.state === STATE_CLOSING) {
return;
}

for (const server of this.s.servers.values()) {
destroyServer(server, this);
closeServer(server, this);
}

this.s.servers.clear();
Expand Down Expand Up @@ -791,12 +797,12 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
}

/** Destroys a server, and removes all event listeners from the instance */
function destroyServer(server: Server, topology: Topology) {
function closeServer(server: Server, topology: Topology) {
for (const event of LOCAL_SERVER_EVENTS) {
server.removeAllListeners(event);
}

server.destroy();
server.close();
topology.emitAndLog(
Topology.SERVER_CLOSED,
new ServerClosedEvent(topology.s.id, server.description.address)
Expand Down Expand Up @@ -903,7 +909,7 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes

// prepare server for garbage collection
if (server) {
destroyServer(server, topology);
closeServer(server, topology);
}
}
}
Expand Down
11 changes: 8 additions & 3 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ describe('Change Streams', function () {
await csDb.createCollection('test').catch(() => null);
collection = csDb.collection('test');
changeStream = collection.watch();
changeStream.on('error', () => null);
});

afterEach(async () => {
Expand Down Expand Up @@ -702,15 +703,19 @@ describe('Change Streams', function () {

const outStream = new PassThrough({ objectMode: true });

// @ts-expect-error: transform requires a Document return type
changeStream.stream({ transform: JSON.stringify }).pipe(outStream);
const transform = doc => ({ doc: JSON.stringify(doc) });
changeStream
.stream({ transform })
.on('error', () => null)
.pipe(outStream)
.on('error', () => null);

const willBeData = once(outStream, 'data');

await collection.insertMany([{ a: 1 }]);

const [data] = await willBeData;
const parsedEvent = JSON.parse(data);
const parsedEvent = JSON.parse(data.doc);
expect(parsedEvent).to.have.nested.property('fullDocument.a', 1);

outStream.destroy();
Expand Down
8 changes: 6 additions & 2 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const { Writable } = require('stream');
const { once, on } = require('events');
const { setTimeout } = require('timers');
const { ReadPreference } = require('../../mongodb');
const { ServerType } = require('../../mongodb');
const { ServerType, ConnectionPoolClosedError } = require('../../mongodb');
const { formatSort } = require('../../mongodb');

describe('Cursor', function () {
Expand Down Expand Up @@ -1872,7 +1872,11 @@ describe('Cursor', function () {
expect(cursor).to.have.property('closed', true);

const error = await rejectedEarlyBecauseClientClosed;
expect(error).to.be.null; // TODO(NODE-6632): This should throw again after the client signal aborts the in-progress next call
if (this.configuration.topologyType === 'LoadBalanced') {
expect(error).to.be.instanceOf(ConnectionPoolClosedError);
} else {
expect(error).to.be.null;
}
});

it('shouldAwaitData', {
Expand Down
17 changes: 8 additions & 9 deletions test/integration/index_management.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -770,20 +770,19 @@ describe('Indexes', function () {

expect(events).to.be.an('array').with.lengthOf(1);
expect(events[0]).nested.property('command.commitQuorum').to.equal(0);
await collection.drop(err => {
expect(err).to.not.exist;
});
await collection.drop();
}
};
}
it(
'should run command with commitQuorum if specified on db.createIndex',
commitQuorumTest((db, collection) =>
db.createIndex(collection.collectionName, 'a', {
// @ts-expect-error revaluate this?
writeConcern: { w: 'majority' },
commitQuorum: 0
})
commitQuorumTest(
async (db, collection) =>
await db.createIndex(collection.collectionName, 'a', {
// @ts-expect-error revaluate this?
writeConcern: { w: 'majority' },
commitQuorum: 0
})
)
);
it(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ describe('examples(causal-consistency):', function () {
it('supports causal consistency', async function () {
const session = client.startSession({ causalConsistency: true });

collection.insertOne({ darmok: 'jalad' }, { session });
collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session });
await collection.insertOne({ darmok: 'jalad' }, { session });
await collection.updateOne({ darmok: 'jalad' }, { $set: { darmok: 'tanagra' } }, { session });

const results = await collection.find({}, { session }).toArray();

Expand Down
4 changes: 2 additions & 2 deletions test/integration/node-specific/mongo_client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -835,15 +835,15 @@ describe('class MongoClient', function () {
client.on('commandStarted', ev => ev.commandName === 'killCursors' && kills.push(ev));
});

it('are all closed', async () => {
it('are all closed', async function () {
const cursors = Array.from({ length: 30 }, (_, skip) =>
collection.find({}, { skip, batchSize: 1 })
);
await Promise.all(cursors.map(c => c.next()));
expect(client.s.activeCursors).to.have.lengthOf(30);
await client.close();
expect(client.s.activeCursors).to.have.lengthOf(0);
expect(kills).to.have.lengthOf(30);
expect(kills).to.have.lengthOf(this.configuration.topologyType === 'LoadBalanced' ? 0 : 30);
});

it('creating cursors after close adds to activeCursors', async () => {
Expand Down
2 changes: 2 additions & 0 deletions test/unit/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const EXPECTED_EXPORTS = [
'ConnectionClosedEvent',
'ConnectionCreatedEvent',
'ConnectionPoolClearedEvent',
'ConnectionPoolClosedError',
'ConnectionPoolClosedEvent',
'ConnectionPoolCreatedEvent',
'ConnectionPoolMonitoringEvent',
Expand Down Expand Up @@ -71,6 +72,7 @@ const EXPECTED_EXPORTS = [
'MongoClientBulkWriteCursorError',
'MongoClientBulkWriteError',
'MongoClientBulkWriteExecutionError',
'MongoClientClosedError',
'MongoCompatibilityError',
'MongoCryptAzureKMSRequestError',
'MongoCryptCreateDataKeyError',
Expand Down