Skip to content

Commit bc92d09

Browse files
committed
Use computed average document size to support usage through Atlas Data Federation
The new implementation stops relying on the storageStats property not being recognized as a valid property when using the $collStats aggregation operation when using a Data Federation endpoint. This end up making it impossible to use the SamplePartitioner, PaginateBySizePartitioner and AutoBucketPartitioner when using a Data Federation endpoint. From what I could see, the storageStats property was only used to access avgObjSize, which can be computed from the size and number of documents of a collection. When connected to a federated Mongo instance, stats are retrieved via the collStats command, whereas the $collStats aggregation operator is used for standard Mongo instances. This difference is due to the collStats command being faster, but deprecated starting from Mongo 6.2. However it doesn't seem to be deprecated for Data Federation as far as I can tell.
1 parent 4d372ae commit bc92d09

File tree

4 files changed

+74
-30
lines changed

4 files changed

+74
-30
lines changed

src/main/java/com/mongodb/spark/sql/connector/read/partitioner/AutoBucketPartitioner.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,6 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
150150
return SINGLE_PARTITIONER.generatePartitions(readConfig);
151151
}
152152

153-
double avgObjSizeInBytes =
154-
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
155-
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);
156-
157153
BsonDocument usersCollectionFilter =
158154
PartitionerHelper.matchQuery(readConfig.getAggregationPipeline());
159155
long count;
@@ -164,6 +160,9 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
164160
usersCollectionFilter, new CountOptions().comment(readConfig.getComment())));
165161
}
166162

163+
double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats, count);
164+
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);
165+
167166
if (numDocumentsPerPartition == 0 || numDocumentsPerPartition >= count) {
168167
LOGGER.info(
169168
"Fewer documents ({}) than the calculated number of documents per partition ({}). Returning a single partition",

src/main/java/com/mongodb/spark/sql/connector/read/partitioner/PaginateBySizePartitioner.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.mongodb.spark.sql.connector.read.partitioner;
1919

2020
import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.SINGLE_PARTITIONER;
21+
import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.matchQuery;
2122
import static java.lang.String.format;
2223

2324
import com.mongodb.client.model.CountOptions;
@@ -55,7 +56,7 @@ public PaginateBySizePartitioner() {}
5556
@Override
5657
public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig) {
5758
MongoConfig partitionerOptions = readConfig.getPartitionerOptions();
58-
int partitionSizeBytes = Assertions.validateConfig(
59+
int partitionSizeInBytes = Assertions.validateConfig(
5960
partitionerOptions.getInt(PARTITION_SIZE_MB_CONFIG, PARTITION_SIZE_MB_DEFAULT),
6061
i -> i > 0,
6162
() ->
@@ -69,18 +70,6 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
6970
return SINGLE_PARTITIONER.generatePartitions(readConfig);
7071
}
7172

72-
double avgObjSizeInBytes =
73-
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
74-
if (avgObjSizeInBytes >= partitionSizeBytes) {
75-
LOGGER.warn(
76-
"Average document size `{}` is greater than the partition size `{}`. Please increase the partition size."
77-
+ "Returning a single partition.",
78-
avgObjSizeInBytes,
79-
partitionSizeBytes);
80-
return SINGLE_PARTITIONER.generatePartitions(readConfig);
81-
}
82-
83-
int numDocumentsPerPartition = (int) Math.floor(partitionSizeBytes / avgObjSizeInBytes);
8473
BsonDocument matchQuery = PartitionerHelper.matchQuery(readConfig.getAggregationPipeline());
8574
long count;
8675
if (matchQuery.isEmpty() && storageStats.containsKey("count")) {
@@ -90,6 +79,18 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
9079
coll.countDocuments(matchQuery, new CountOptions().comment(readConfig.getComment())));
9180
}
9281

82+
double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats, count);
83+
int numDocumentsPerPartition = (int) Math.floor(partitionSizeInBytes / avgObjSizeInBytes);
84+
85+
if (avgObjSizeInBytes >= partitionSizeInBytes) {
86+
LOGGER.warn(
87+
"Average document size `{}` is greater than the partition size `{}`. Please increase the partition size."
88+
+ "Returning a single partition.",
89+
avgObjSizeInBytes,
90+
partitionSizeInBytes);
91+
return SINGLE_PARTITIONER.generatePartitions(readConfig);
92+
}
93+
9394
if (count <= numDocumentsPerPartition) {
9495
LOGGER.warn(
9596
"The calculated number of documents per partition {} is greater than or equal to the number of matching documents. "

src/main/java/com/mongodb/spark/sql/connector/read/partitioner/PartitionerHelper.java

+54-11
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package com.mongodb.spark.sql.connector.read.partitioner;
1919

2020
import static com.mongodb.spark.sql.connector.read.partitioner.Partitioner.LOGGER;
21-
import static java.util.Collections.singletonList;
21+
import static java.lang.String.format;
22+
import static java.util.Arrays.asList;
2223

2324
import com.mongodb.MongoCommandException;
2425
import com.mongodb.client.MongoDatabase;
@@ -37,9 +38,12 @@
3738
/** Partitioner helper class, contains various utility methods used by the partitioner instances. */
3839
public final class PartitionerHelper {
3940

40-
private static final List<BsonDocument> COLL_STATS_AGGREGATION_PIPELINE =
41-
singletonList(BsonDocument.parse("{'$collStats': {'storageStats': { } } }"));
41+
private static final List<BsonDocument> COLL_STATS_AGGREGATION_PIPELINE = asList(
42+
BsonDocument.parse("{'$collStats': {'storageStats': { } } }"),
43+
BsonDocument.parse(
44+
"{'$project': {'size': '$storageStats.size', 'count': '$storageStats.count' } }"));
4245
private static final BsonDocument PING_COMMAND = BsonDocument.parse("{ping: 1}");
46+
private static final BsonDocument BUILD_INFO_COMMAND = BsonDocument.parse("{buildInfo: 1}");
4347
public static final Partitioner SINGLE_PARTITIONER = new SinglePartitionPartitioner();
4448

4549
/**
@@ -101,14 +105,34 @@ public static List<BsonDocument> createPartitionPipeline(
101105
public static BsonDocument storageStats(final ReadConfig readConfig) {
102106
LOGGER.info("Getting collection stats for: {}", readConfig.getNamespace().getFullName());
103107
try {
104-
return readConfig
105-
.withCollection(
106-
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_AGGREGATION_PIPELINE)
107-
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
108-
.comment(readConfig.getComment())
109-
.first())
110-
.orElseGet(BsonDocument::new))
111-
.getDocument("storageStats", new BsonDocument());
108+
BsonDocument buildInfo = readConfig.withClient(c -> {
109+
MongoDatabase db = c.getDatabase(readConfig.getDatabaseName());
110+
return db.runCommand(BUILD_INFO_COMMAND).toBsonDocument();
111+
});
112+
113+
// Atlas Data Federation does not support the storageStats property and requires
114+
// special handling to return the federated collection stats.
115+
if (!buildInfo.containsKey("dataLake")) {
116+
return readConfig.withClient(c -> {
117+
MongoDatabase db = c.getDatabase(readConfig.getDatabaseName());
118+
BsonDocument command =
119+
BsonDocument.parse(format("{ collStats: '%s' }", readConfig.getCollectionName()));
120+
BsonDocument result = db.runCommand(command).toBsonDocument();
121+
122+
BsonDocument formattedResult = new BsonDocument();
123+
formattedResult.append("count", result.get("count"));
124+
formattedResult.append("size", result.get("size"));
125+
126+
return formattedResult;
127+
});
128+
}
129+
130+
return readConfig.withCollection(
131+
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_AGGREGATION_PIPELINE)
132+
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
133+
.comment(readConfig.getComment())
134+
.first())
135+
.orElseGet(BsonDocument::new));
112136
} catch (RuntimeException ex) {
113137
if (ex instanceof MongoCommandException
114138
&& (ex.getMessage().contains("not found.")
@@ -138,5 +162,24 @@ public static List<String> getPreferredLocations(final ReadConfig readConfig) {
138162
.collect(Collectors.toList());
139163
}
140164

165+
/**
166+
* Returns the average document size in a collection, either using {@code avgObjSize}
167+
* or calculated from document count and collection size.
168+
*
169+
* @param storageStats the storage stats of a collection
170+
* @param documentCount the number of documents in a collection
171+
* @return the average document size in a collection
172+
*/
173+
public static double averageDocumentSize(final BsonDocument storageStats, final long documentCount) {
174+
if (storageStats.containsKey("avgObjSize")) {
175+
return storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
176+
}
177+
178+
long size = storageStats.getNumber("size").longValue();
179+
double avgObjSizeInBytes = Math.floor(size / documentCount);
180+
181+
return avgObjSizeInBytes;
182+
}
183+
141184
private PartitionerHelper() {}
142185
}

src/main/java/com/mongodb/spark/sql/connector/read/partitioner/SamplePartitioner.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.mongodb.spark.sql.connector.read.partitioner;
1919

2020
import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.SINGLE_PARTITIONER;
21+
import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.matchQuery;
2122
import static java.lang.String.format;
2223
import static java.util.Arrays.asList;
2324

@@ -105,8 +106,8 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
105106
count = readConfig.withCollection(coll ->
106107
coll.countDocuments(matchQuery, new CountOptions().comment(readConfig.getComment())));
107108
}
108-
double avgObjSizeInBytes =
109-
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
109+
110+
double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats, count);
110111
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);
111112

112113
if (numDocumentsPerPartition >= count) {

0 commit comments

Comments
 (0)