Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use computed average document size to support usage through Atlas Data Federation #132

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,6 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

double avgObjSizeInBytes =
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);

BsonDocument usersCollectionFilter =
PartitionerHelper.matchQuery(readConfig.getAggregationPipeline());
long count;
Expand All @@ -164,6 +160,9 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
usersCollectionFilter, new CountOptions().comment(readConfig.getComment())));
}

double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats, count);
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);

if (numDocumentsPerPartition == 0 || numDocumentsPerPartition >= count) {
LOGGER.info(
"Fewer documents ({}) than the calculated number of documents per partition ({}). Returning a single partition",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.mongodb.spark.sql.connector.read.partitioner;

import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.SINGLE_PARTITIONER;
import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.matchQuery;
import static java.lang.String.format;

import com.mongodb.client.model.CountOptions;
Expand Down Expand Up @@ -55,7 +56,7 @@ public PaginateBySizePartitioner() {}
@Override
public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig) {
MongoConfig partitionerOptions = readConfig.getPartitionerOptions();
int partitionSizeBytes = Assertions.validateConfig(
int partitionSizeInBytes = Assertions.validateConfig(
partitionerOptions.getInt(PARTITION_SIZE_MB_CONFIG, PARTITION_SIZE_MB_DEFAULT),
i -> i > 0,
() ->
Expand All @@ -69,18 +70,6 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

double avgObjSizeInBytes =
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
if (avgObjSizeInBytes >= partitionSizeBytes) {
LOGGER.warn(
"Average document size `{}` is greater than the partition size `{}`. Please increase the partition size."
+ "Returning a single partition.",
avgObjSizeInBytes,
partitionSizeBytes);
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

int numDocumentsPerPartition = (int) Math.floor(partitionSizeBytes / avgObjSizeInBytes);
BsonDocument matchQuery = PartitionerHelper.matchQuery(readConfig.getAggregationPipeline());
long count;
if (matchQuery.isEmpty() && storageStats.containsKey("count")) {
Expand All @@ -90,6 +79,18 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
coll.countDocuments(matchQuery, new CountOptions().comment(readConfig.getComment())));
}

double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats, count);
int numDocumentsPerPartition = (int) Math.floor(partitionSizeInBytes / avgObjSizeInBytes);

if (avgObjSizeInBytes >= partitionSizeInBytes) {
LOGGER.warn(
"Average document size `{}` is greater than the partition size `{}`. Please increase the partition size."
+ "Returning a single partition.",
avgObjSizeInBytes,
partitionSizeInBytes);
return SINGLE_PARTITIONER.generatePartitions(readConfig);
}

if (count <= numDocumentsPerPartition) {
LOGGER.warn(
"The calculated number of documents per partition {} is greater than or equal to the number of matching documents. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package com.mongodb.spark.sql.connector.read.partitioner;

import static com.mongodb.spark.sql.connector.read.partitioner.Partitioner.LOGGER;
import static java.util.Collections.singletonList;
import static java.lang.String.format;
import static java.util.Arrays.asList;

import com.mongodb.MongoCommandException;
import com.mongodb.client.MongoDatabase;
Expand All @@ -37,9 +38,12 @@
/** Partitioner helper class, contains various utility methods used by the partitioner instances. */
public final class PartitionerHelper {

private static final List<BsonDocument> COLL_STATS_AGGREGATION_PIPELINE =
singletonList(BsonDocument.parse("{'$collStats': {'storageStats': { } } }"));
private static final List<BsonDocument> COLL_STATS_AGGREGATION_PIPELINE = asList(
BsonDocument.parse("{'$collStats': {'storageStats': { } } }"),
BsonDocument.parse(
"{'$project': {'size': '$storageStats.size', 'count': '$storageStats.count' } }"));
private static final BsonDocument PING_COMMAND = BsonDocument.parse("{ping: 1}");
private static final BsonDocument BUILD_INFO_COMMAND = BsonDocument.parse("{buildInfo: 1}");
public static final Partitioner SINGLE_PARTITIONER = new SinglePartitionPartitioner();

/**
Expand Down Expand Up @@ -101,14 +105,34 @@ public static List<BsonDocument> createPartitionPipeline(
public static BsonDocument storageStats(final ReadConfig readConfig) {
LOGGER.info("Getting collection stats for: {}", readConfig.getNamespace().getFullName());
try {
return readConfig
.withCollection(
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_AGGREGATION_PIPELINE)
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
.comment(readConfig.getComment())
.first())
.orElseGet(BsonDocument::new))
.getDocument("storageStats", new BsonDocument());
BsonDocument buildInfo = readConfig.withClient(c -> {
MongoDatabase db = c.getDatabase(readConfig.getDatabaseName());
return db.runCommand(BUILD_INFO_COMMAND).toBsonDocument();
});

// Atlas Data Federation does not support the storageStats property and requires
// special handling to return the federated collection stats.
if (!buildInfo.containsKey("dataLake")) {
return readConfig.withClient(c -> {
MongoDatabase db = c.getDatabase(readConfig.getDatabaseName());
BsonDocument command =
BsonDocument.parse(format("{ collStats: '%s' }", readConfig.getCollectionName()));
BsonDocument result = db.runCommand(command).toBsonDocument();

BsonDocument formattedResult = new BsonDocument();
formattedResult.append("count", result.get("count"));
formattedResult.append("size", result.get("size"));

return formattedResult;
});
}

return readConfig.withCollection(
coll -> Optional.ofNullable(coll.aggregate(COLL_STATS_AGGREGATION_PIPELINE)
.allowDiskUse(readConfig.getAggregationAllowDiskUse())
.comment(readConfig.getComment())
.first())
.orElseGet(BsonDocument::new));
} catch (RuntimeException ex) {
if (ex instanceof MongoCommandException
&& (ex.getMessage().contains("not found.")
Expand Down Expand Up @@ -138,5 +162,24 @@ public static List<String> getPreferredLocations(final ReadConfig readConfig) {
.collect(Collectors.toList());
}

/**
* Returns the average document size in a collection, either using {@code avgObjSize}
* or calculated from document count and collection size.
*
* @param storageStats the storage stats of a collection
* @param documentCount the number of documents in a collection
* @return the average document size in a collection
*/
public static double averageDocumentSize(final BsonDocument storageStats, final long documentCount) {
if (storageStats.containsKey("avgObjSize")) {
return storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();
}

long size = storageStats.getNumber("size").longValue();
double avgObjSizeInBytes = Math.floor(size / documentCount);

return avgObjSizeInBytes;
}

private PartitionerHelper() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.mongodb.spark.sql.connector.read.partitioner;

import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.SINGLE_PARTITIONER;
import static com.mongodb.spark.sql.connector.read.partitioner.PartitionerHelper.matchQuery;
import static java.lang.String.format;
import static java.util.Arrays.asList;

Expand Down Expand Up @@ -105,8 +106,8 @@ public List<MongoInputPartition> generatePartitions(final ReadConfig readConfig)
count = readConfig.withCollection(coll ->
coll.countDocuments(matchQuery, new CountOptions().comment(readConfig.getComment())));
}
double avgObjSizeInBytes =
storageStats.get("avgObjSize", new BsonInt32(0)).asNumber().doubleValue();

double avgObjSizeInBytes = PartitionerHelper.averageDocumentSize(storageStats, count);
double numDocumentsPerPartition = Math.floor(partitionSizeInBytes / avgObjSizeInBytes);

if (numDocumentsPerPartition >= count) {
Expand Down