Skip to content

Commit 7d8fa3a

Browse files
themattmanMongoDB Bot
authored and
MongoDB Bot
committed
SERVER-101659 Make adjustments to time-series write path to enable connecting batch interface (#33038)
GitOrigin-RevId: 8dd565f8587d625cb55b163417891b6fdc133814
1 parent 6a88586 commit 7d8fa3a

9 files changed

+41
-29
lines changed

src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -947,7 +947,7 @@ Bucket& getEligibleBucket(OperationContext* opCtx,
947947
BucketCatalog& catalog,
948948
Stripe& stripe,
949949
stdx::unique_lock<stdx::mutex>& stripeLock,
950-
const CollectionPtr& bucketsColl,
950+
const Collection* bucketsColl,
951951
const BSONObj& measurement,
952952
const BucketKey& bucketKey,
953953
const Date_t& measurementTimestamp,
@@ -984,7 +984,7 @@ Bucket& getEligibleBucket(OperationContext* opCtx,
984984
catalog,
985985
stripe,
986986
stripeLock,
987-
bucketsColl.get(),
987+
bucketsColl,
988988
bucketKey,
989989
measurementTimestamp,
990990
options,

src/mongo/db/timeseries/bucket_catalog/bucket_catalog.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ Bucket& getEligibleBucket(OperationContext* opCtx,
471471
BucketCatalog& catalog,
472472
Stripe& stripe,
473473
stdx::unique_lock<stdx::mutex>& stripeLock,
474-
const CollectionPtr& bucketsColl,
474+
const Collection* bucketsColl,
475475
const BSONObj& measurement,
476476
const BucketKey& bucketKey,
477477
const Date_t& measurementTimestamp,

src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2744,7 +2744,7 @@ TEST_F(BucketCatalogTest, GetEligibleBucketAllocateBucket) {
27442744
*_bucketCatalog,
27452745
*_bucketCatalog->stripes[batchedInsertCtx.stripeNumber],
27462746
stripeLock,
2747-
bucketsColl,
2747+
bucketsColl.get(),
27482748
measurement,
27492749
batchedInsertCtx.key,
27502750
measurementTimestamp,
@@ -2798,7 +2798,7 @@ TEST_F(BucketCatalogTest, GetEligibleBucketOpenBucket) {
27982798
*_bucketCatalog,
27992799
*_bucketCatalog->stripes[batchedInsertCtx.stripeNumber],
28002800
stripeLock,
2801-
bucketsColl,
2801+
bucketsColl.get(),
28022802
measurement,
28032803
batchedInsertCtx.key,
28042804
measurementTimestamp,

src/mongo/db/timeseries/bucket_catalog/write_batch.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "mongo/bson/oid.h"
4141
#include "mongo/db/operation_id.h"
4242
#include "mongo/db/repl/optime.h"
43+
#include "mongo/db/session/logical_session_id.h"
4344
#include "mongo/db/timeseries/bucket_catalog/bucket_identifiers.h"
4445
#include "mongo/db/timeseries/bucket_catalog/execution_stats.h"
4546
#include "mongo/db/timeseries/bucket_catalog/measurement_map.h"
@@ -129,8 +130,9 @@ struct WriteBatch {
129130
ExecutionStatsController stats;
130131

131132
// Indices for measurements in the original user batch. Used for retryability and
132-
// error-handling.
133+
// error-handling. These two should be the same length when entering commit.
133134
std::vector<UserBatchIndex> userBatchIndices;
135+
std::vector<StmtId> stmtIds;
134136

135137
// Marginal numbers for this batch only.
136138
// Sizes.uncommittedMeasurementEstimate is a rough estimate of data in this batch,

src/mongo/db/timeseries/timeseries_write_util.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,15 @@ StatusWith<bucket_catalog::InsertResult> attemptInsertIntoBucket(
407407
MONGO_UNREACHABLE;
408408
}
409409

410-
TimeseriesBatches insertIntoBucketCatalogForUpdate(
410+
TimeseriesWriteBatches insertIntoBucketCatalogForUpdate(
411411
OperationContext* opCtx,
412412
bucket_catalog::BucketCatalog& bucketCatalog,
413413
const CollectionPtr& bucketsColl,
414414
const std::vector<BSONObj>& measurements,
415415
const NamespaceString& bucketsNs,
416416
TimeseriesOptions& timeSeriesOptions,
417417
const CompressAndWriteBucketFunc& compressAndWriteBucketFunc) {
418-
TimeseriesBatches batches;
418+
TimeseriesWriteBatches batches;
419419

420420
for (const auto& measurement : measurements) {
421421
auto result =
@@ -515,7 +515,7 @@ void commitTimeseriesBucketsAtomically(
515515
const RecordId& recordId,
516516
const boost::optional<std::variant<write_ops::UpdateCommandRequest,
517517
write_ops::DeleteCommandRequest>>& modificationOp,
518-
TimeseriesBatches* batches,
518+
TimeseriesWriteBatches* batches,
519519
const NamespaceString& bucketsNs,
520520
bool fromMigrate,
521521
StmtId stmtId,

src/mongo/db/timeseries/timeseries_write_util.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ BSONObj makeBucketDocument(const std::vector<BSONObj>& measurements,
7575
const TimeseriesOptions& options,
7676
const StringDataComparator* comparator);
7777

78-
using TimeseriesBatches = std::vector<std::shared_ptr<bucket_catalog::WriteBatch>>;
78+
using TimeseriesWriteBatches = std::vector<std::shared_ptr<bucket_catalog::WriteBatch>>;
7979
using TimeseriesStmtIds = stdx::unordered_map<bucket_catalog::WriteBatch*, std::vector<StmtId>>;
8080

8181
/**

src/mongo/db/timeseries/write_ops/internal/timeseries_write_ops_internal.cpp

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,10 +1121,10 @@ std::vector<bucket_catalog::BatchedInsertContext> buildBatchedInsertContexts(
11211121
errorsAndIndices);
11221122
}
11231123

1124-
std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> stageInsertBatch(
1124+
TimeseriesWriteBatches stageInsertBatch(
11251125
OperationContext* opCtx,
11261126
bucket_catalog::BucketCatalog& bucketCatalog,
1127-
const CollectionPtr& bucketsColl,
1127+
const Collection* bucketsColl,
11281128
const OperationId& opId,
11291129
const StringDataComparator* comparator,
11301130
uint64_t storageCacheSizeBytes,
@@ -1138,7 +1138,7 @@ std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> stageInsertBatch(
11381138
const auto catalogEra = getCurrentEra(bucketCatalog.bucketStateRegistry);
11391139
auto& stripe = *bucketCatalog.stripes[batch.stripeNumber];
11401140
stdx::unique_lock<stdx::mutex> stripeLock{stripe.mutex};
1141-
std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> writeBatches;
1141+
TimeseriesWriteBatches writeBatches;
11421142
size_t currentPosition = 0;
11431143
bool needsAnotherBucket = true;
11441144

@@ -1182,14 +1182,15 @@ std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> stageInsertBatch(
11821182
}
11831183

11841184

1185-
StatusWith<std::vector<std::shared_ptr<bucket_catalog::WriteBatch>>> prepareInsertsToBuckets(
1185+
StatusWith<TimeseriesWriteBatches> prepareInsertsToBuckets(
11861186
OperationContext* opCtx,
11871187
bucket_catalog::BucketCatalog& bucketCatalog,
1188-
const CollectionPtr& bucketsColl,
1188+
const Collection* bucketsColl,
11891189
const TimeseriesOptions& timeseriesOptions,
11901190
OperationId opId,
11911191
const StringDataComparator* comparator,
11921192
uint64_t storageCacheSizeBytes,
1193+
bool earlyReturnOnError,
11931194
const CompressAndWriteBucketFunc& compressAndWriteBucketFunc,
11941195
const std::vector<BSONObj>& userMeasurementsBatch,
11951196
std::vector<WriteStageErrorAndIndex>& errorsAndIndices) {
@@ -1199,12 +1200,12 @@ StatusWith<std::vector<std::shared_ptr<bucket_catalog::WriteBatch>>> prepareInse
11991200
userMeasurementsBatch,
12001201
errorsAndIndices);
12011202

1202-
// Any errors in the user batch will early-exit and be attempted one-at-a-time.
1203-
if (!errorsAndIndices.empty()) {
1203+
if (earlyReturnOnError && !errorsAndIndices.empty()) {
1204+
// Any errors in the user batch will early-exit and be attempted one-at-a-time.
12041205
return errorsAndIndices.front().error;
12051206
}
12061207

1207-
std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> results;
1208+
TimeseriesWriteBatches results;
12081209

12091210
for (auto& batchedInsertContext : batchedInsertContexts) {
12101211
auto writeBatches = stageInsertBatch(opCtx,

src/mongo/db/timeseries/write_ops/internal/timeseries_write_ops_internal.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,10 @@ std::vector<bucket_catalog::BatchedInsertContext> buildBatchedInsertContexts(
163163
* attempting to stage a measurement into a bucket, the function will find another eligible
164164
* buckets until all measurements are inserted.
165165
*/
166-
std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> stageInsertBatch(
166+
TimeseriesWriteBatches stageInsertBatch(
167167
OperationContext* opCtx,
168168
bucket_catalog::BucketCatalog& bucketCatalog,
169-
const CollectionPtr& bucketsColl,
169+
const Collection* bucketsColl,
170170
const OperationId& opId,
171171
const StringDataComparator* comparator,
172172
uint64_t storageCacheSizeBytes,
@@ -178,15 +178,18 @@ std::vector<std::shared_ptr<bucket_catalog::WriteBatch>> stageInsertBatch(
178178
* Returns a non-success status if any measurements are malformed, and further
179179
* returns the index into 'userMeasurementsBatch' of each failure in 'errorsAndIndices'.
180180
* Returns a write batch per bucket that the measurements are staged to.
181+
* 'earlyReturnOnError' decides whether or not staging should happen in the case of any malformed
182+
* measurements.
181183
*/
182-
StatusWith<std::vector<std::shared_ptr<bucket_catalog::WriteBatch>>> prepareInsertsToBuckets(
184+
StatusWith<TimeseriesWriteBatches> prepareInsertsToBuckets(
183185
OperationContext* opCtx,
184186
bucket_catalog::BucketCatalog& bucketCatalog,
185-
const CollectionPtr& bucketsColl,
187+
const Collection* bucketsColl,
186188
const TimeseriesOptions& timeseriesOptions,
187189
OperationId opId,
188190
const StringDataComparator* comparator,
189191
uint64_t storageCacheSizeBytes,
192+
bool earlyReturnOnError,
190193
const CompressAndWriteBucketFunc& compressAndWriteBucketFunc,
191194
const std::vector<BSONObj>& userMeasurementsBatch,
192195
std::vector<WriteStageErrorAndIndex>& errorsAndIndices);

src/mongo/db/timeseries/write_ops/internal/timeseries_write_ops_internal_test.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ void TimeseriesWriteOpsInternalTest::_testStageInsertBatch(
242242
auto writeBatches =
243243
write_ops::internal::stageInsertBatch(_opCtx,
244244
*_bucketCatalog,
245-
bucketsColl,
245+
bucketsColl.get(),
246246
_opCtx->getOpID(),
247247
nullptr /*comparator*/,
248248
_storageCacheSizeBytes,
@@ -595,11 +595,12 @@ TEST_F(TimeseriesWriteOpsInternalTest, PrepareInsertsToBucketsSimpleOneFullBucke
595595

596596
auto swWriteBatches = prepareInsertsToBuckets(_opCtx,
597597
*_bucketCatalog,
598-
bucketsColl,
598+
bucketsColl.get(),
599599
tsOptions,
600600
_opCtx->getOpID(),
601601
_getCollator(_ns1),
602602
_getStorageCacheSizeBytes(),
603+
/*earlyReturnOnError=*/true,
603604
_compressBucket,
604605
userBatch,
605606
errorsAndIndices);
@@ -630,11 +631,12 @@ TEST_F(TimeseriesWriteOpsInternalTest, PrepareInsertsToBucketsMultipleBucketsOne
630631

631632
auto swWriteBatches = prepareInsertsToBuckets(_opCtx,
632633
*_bucketCatalog,
633-
bucketsColl,
634+
bucketsColl.get(),
634635
tsOptions,
635636
_opCtx->getOpID(),
636637
_getCollator(_ns1),
637638
_getStorageCacheSizeBytes(),
639+
/*earlyReturnOnError=*/true,
638640
_compressBucket,
639641
userBatch,
640642
errorsAndIndices);
@@ -668,11 +670,12 @@ TEST_F(TimeseriesWriteOpsInternalTest, PrepareInsertsToBucketsMultipleBucketsMul
668670

669671
auto swWriteBatches = prepareInsertsToBuckets(_opCtx,
670672
*_bucketCatalog,
671-
bucketsColl,
673+
bucketsColl.get(),
672674
tsOptions,
673675
_opCtx->getOpID(),
674676
_getCollator(_ns1),
675677
_getStorageCacheSizeBytes(),
678+
/*earlyReturnOnError=*/true,
676679
_compressBucket,
677680
userBatch,
678681
errorsAndIndices);
@@ -705,11 +708,12 @@ TEST_F(TimeseriesWriteOpsInternalTest,
705708

706709
auto swWriteBatches = prepareInsertsToBuckets(_opCtx,
707710
*_bucketCatalog,
708-
bucketsColl,
711+
bucketsColl.get(),
709712
tsOptions,
710713
_opCtx->getOpID(),
711714
_getCollator(_ns1),
712715
_getStorageCacheSizeBytes(),
716+
/*earlyReturnOnError=*/true,
713717
_compressBucket,
714718
userBatch,
715719
errorsAndIndices);
@@ -740,11 +744,12 @@ TEST_F(TimeseriesWriteOpsInternalTest, PrepareInsertsBadMeasurementsAll) {
740744

741745
auto swWriteBatches = prepareInsertsToBuckets(_opCtx,
742746
*_bucketCatalog,
743-
bucketsColl,
747+
bucketsColl.get(),
744748
tsOptions,
745749
_opCtx->getOpID(),
746750
_getCollator(_ns1),
747751
_getStorageCacheSizeBytes(),
752+
/*earlyReturnOnError=*/true,
748753
_compressBucket,
749754
userMeasurementsBatch,
750755
errorsAndIndices);
@@ -770,11 +775,12 @@ TEST_F(TimeseriesWriteOpsInternalTest, PrepareInsertsBadMeasurementsSome) {
770775

771776
auto swWriteBatches = prepareInsertsToBuckets(_opCtx,
772777
*_bucketCatalog,
773-
bucketsColl,
778+
bucketsColl.get(),
774779
tsOptions,
775780
_opCtx->getOpID(),
776781
_getCollator(_ns1),
777782
_getStorageCacheSizeBytes(),
783+
/*earlyReturnOnError=*/true,
778784
_compressBucket,
779785
userMeasurementsBatch,
780786
errorsAndIndices);

0 commit comments

Comments
 (0)