Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use computed average document size to support usage through Atlas Data Federation #132

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

guillotjulien
Copy link

@guillotjulien guillotjulien commented Feb 14, 2025

The new implementation stops relying on the storageStats property that is not being recognized as a valid property when using the $collStats aggregation operation with a Data Federation endpoint.
This end up making it impossible to use the SamplePartitioner, PaginateBySizePartitioner and AutoBucketPartitioner in that situation.

Example:

Caused by: com.mongodb.MongoCommandException: Command failed with error 9 (FailedToParse): '$collStats param 'storageStats' is not valid for Atlas Data Federation, correlationID = 182415f1c5629184818f0150' on server <REDACTED>. The full response is {"ok": 0, "errmsg": "$collStats param 'storageStats' is not valid for Atlas Data Federation, correlationID = 182415f1c5629184818f0150", "code": 9, "codeName": "FailedToParse"}
        at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
        at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:416)
        at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:340)
        at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
        at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:643)
        at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
        at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:206)
        at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:119)
        at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:85)
        at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:75)
        at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:293)
        at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:233)
        at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:215)
        at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$0(OperationHelper.java:356)
        at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:381)
        at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$1(OperationHelper.java:355)
        at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:381)
        at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:354)
        at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:213)
        at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:67)
        at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:218)
        at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:199)
        at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:194)
        at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:150)
        at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:44)
        at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
        at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:133)
        at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:90)
        at com.mongodb.client.internal.MongoIterableImpl.first(MongoIterableImpl.java:101)
        at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.lambda$storageStats$0(PartitionerHelper.java:103)
        at com.mongodb.spark.sql.connector.config.AbstractMongoConfig.withCollection(AbstractMongoConfig.java:164)
        at com.mongodb.spark.sql.connector.config.ReadConfig.withCollection(ReadConfig.java:45)
        at com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.storageStats(PartitionerHelper.java:99)
        ... 85 more

From what I could see, the storageStats property was only used to access avgObjSize, which can be computed from the size and number of documents of a collection.

When connected to a federated Mongo instance, stats are retrieved via the collStats command, whereas the $collStats aggregation operator is used for standard Mongo instances. This difference is due to the collStats command being faster, but deprecated starting from Mongo 6.2. However it doesn't seem to be deprecated for Data Federation as far as I can tell (from https://www.mongodb.com/docs/atlas/data-federation/supported-unsupported/diagnostic-commands/#collstats).

@rozza rozza self-requested a review February 25, 2025 16:36
@rozza
Copy link
Member

rozza commented Feb 25, 2025

Hi @guillotjulien,

Thanks for the PR. I have added a ticket SPARK-442 to track calculating the {{avgObjSize}} if not available.

@rozza rozza added the external label Mar 4, 2025
@rozza rozza marked this pull request as draft March 4, 2025 10:18
@rozza rozza marked this pull request as ready for review March 4, 2025 17:30
Copy link
Member

@rozza rozza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks worthwhile. I would recommend getting avgObjSize if available else calculating it.

…a Federation

The new implementation stops relying on the storageStats property not being recognized as a valid property when using the $collStats aggregation operation when using a Data Federation endpoint.
This end up making it impossible to use the SamplePartitioner, PaginateBySizePartitioner and AutoBucketPartitioner when using a Data Federation endpoint.

From what I could see, the storageStats property was only used to access avgObjSize, which can be computed from the size and number of documents of a collection.

When connected to a federated Mongo instance, stats are retrieved via the collStats command, whereas the $collStats aggregation operator is used for standard Mongo instances.
This difference is due to the collStats command being faster, but deprecated starting from Mongo 6.2. However it doesn't seem to be deprecated for Data Federation as far as I can tell.
@guillotjulien guillotjulien force-pushed the fix/data-federation-support branch from 453aa29 to bc92d09 Compare March 6, 2025 10:36
@guillotjulien guillotjulien requested a review from rozza March 6, 2025 10:36
@guillotjulien
Copy link
Author

guillotjulien commented Mar 6, 2025

I extracted the logic to get the average object size in PartitionerHelper so that all partitioners use the same logic.
We're using avgObjSize when available now, otherwise we compute it.

I retested locally, and SamplePartitioner can be used for both standard replicaset, and data federation endpoints.

@guillotjulien
Copy link
Author

Hi @rozza, any chance you'd have time to look at this again?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants