Skip to content

Commit 6cc8646

Browse files
committed
remove skip_watermark, which allows us to standardize how to update the starting ingestion checkpoint, and each pipeline's checkpoint to start processing from. rework tests
1 parent 6317899 commit 6cc8646

File tree

7 files changed

+835
-442
lines changed

7 files changed

+835
-442
lines changed

crates/sui-indexer-alt-framework/src/lib.rs

Lines changed: 813 additions & 343 deletions
Large diffs are not rendered by default.

crates/sui-indexer-alt-framework/src/pipeline/concurrent/commit_watermark.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,16 @@ use super::Handler;
4141
/// [LOUD_WATERMARK_UPDATE_INTERVAL]-many checkpoints.
4242
///
4343
/// The task will shutdown if the `cancel` token is signalled, or if the `rx` channel closes and
44-
/// the watermark cannot be progressed. If `skip_watermark` is set, the task will shutdown
45-
/// immediately.
44+
/// the watermark cannot be progressed.
4645
pub(super) fn commit_watermark<H: Handler + 'static>(
4746
mut next_checkpoint: u64,
4847
config: CommitterConfig,
49-
skip_watermark: bool,
5048
mut rx: mpsc::Receiver<Vec<WatermarkPart>>,
5149
store: H::Store,
5250
metrics: Arc<IndexerMetrics>,
5351
cancel: CancellationToken,
5452
) -> JoinHandle<()> {
5553
tokio::spawn(async move {
56-
if skip_watermark {
57-
info!(pipeline = H::NAME, "Skipping commit watermark task");
58-
return;
59-
}
60-
6154
let mut poll = interval(config.watermark_interval());
6255
poll.set_missed_tick_behavior(MissedTickBehavior::Delay);
6356

@@ -333,7 +326,6 @@ mod tests {
333326
let commit_watermark_handle = commit_watermark::<H>(
334327
next_checkpoint,
335328
config,
336-
false,
337329
watermark_rx,
338330
store_clone,
339331
metrics,

crates/sui-indexer-alt-framework/src/pipeline/concurrent/committer.rs

Lines changed: 9 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,12 @@ const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(1);
3030
///
3131
/// The writing of each batch will be repeatedly retried on an exponential back-off until it
3232
/// succeeds. Once the write succeeds, the [WatermarkPart]s for that batch are sent on `tx` to the
33-
/// watermark task, as long as `skip_watermark` is not true.
33+
/// watermark task.
3434
///
3535
/// This task will shutdown via its `cancel`lation token, or if its receiver or sender channels are
3636
/// closed.
3737
pub(super) fn committer<H: Handler + 'static>(
3838
config: CommitterConfig,
39-
skip_watermark: bool,
4039
rx: mpsc::Receiver<BatchedRows<H>>,
4140
tx: mpsc::Sender<Vec<WatermarkPart>>,
4241
db: H::Store,
@@ -187,7 +186,7 @@ pub(super) fn committer<H: Handler + 'static>(
187186
}
188187
};
189188

190-
if !skip_watermark && tx.send(watermark).await.is_err() {
189+
if tx.send(watermark).await.is_err() {
191190
info!(pipeline = H::NAME, "Watermark closed channel");
192191
return Err(Break::Cancel);
193192
}
@@ -318,8 +317,7 @@ mod tests {
318317
///
319318
/// # Arguments
320319
/// * `store` - The mock store to use for testing
321-
/// * `skip_watermark` - Whether to skip sending watermarks to the watermark channel
322-
async fn setup_test(store: MockStore, skip_watermark: bool) -> TestSetup {
320+
async fn setup_test(store: MockStore) -> TestSetup {
323321
let config = CommitterConfig::default();
324322
let metrics = IndexerMetrics::new(None, &Default::default());
325323
let cancel = CancellationToken::new();
@@ -329,16 +327,7 @@ mod tests {
329327

330328
let store_clone = store.clone();
331329
let committer_handle = tokio::spawn(async move {
332-
let _ = committer(
333-
config,
334-
skip_watermark,
335-
batch_rx,
336-
watermark_tx,
337-
store_clone,
338-
metrics,
339-
cancel,
340-
)
341-
.await;
330+
let _ = committer(config, batch_rx, watermark_tx, store_clone, metrics, cancel).await;
342331
});
343332

344333
TestSetup {
@@ -351,7 +340,7 @@ mod tests {
351340

352341
#[tokio::test]
353342
async fn test_concurrent_batch_processing() {
354-
let mut setup = setup_test(MockStore::default(), false).await;
343+
let mut setup = setup_test(MockStore::default()).await;
355344

356345
// Send batches
357346
let batch1 = BatchedRows {
@@ -434,7 +423,7 @@ mod tests {
434423

435424
#[tokio::test]
436425
async fn test_commit_with_retries_for_commit_failure() {
437-
let mut setup = setup_test(MockStore::default(), false).await;
426+
let mut setup = setup_test(MockStore::default()).await;
438427

439428
// Create a batch with a single item that will fail once before succeeding
440429
let batch = BatchedRows {
@@ -503,7 +492,7 @@ mod tests {
503492
})),
504493
..Default::default()
505494
};
506-
let mut setup = setup_test(store, false).await;
495+
let mut setup = setup_test(store).await;
507496

508497
let batch = BatchedRows {
509498
values: vec![StoredData {
@@ -560,7 +549,7 @@ mod tests {
560549

561550
#[tokio::test]
562551
async fn test_empty_batch_handling() {
563-
let mut setup = setup_test(MockStore::default(), false).await;
552+
let mut setup = setup_test(MockStore::default()).await;
564553

565554
let empty_batch = BatchedRows {
566555
values: vec![], // Empty values
@@ -599,54 +588,9 @@ mod tests {
599588
let _ = setup.committer_handle.await;
600589
}
601590

602-
#[tokio::test]
603-
async fn test_skip_watermark_mode() {
604-
let mut setup = setup_test(MockStore::default(), true).await;
605-
606-
let batch = BatchedRows {
607-
values: vec![StoredData {
608-
cp_sequence_number: 1,
609-
tx_sequence_numbers: vec![1, 2, 3],
610-
..Default::default()
611-
}],
612-
watermark: vec![WatermarkPart {
613-
watermark: CommitterWatermark {
614-
epoch_hi_inclusive: 0,
615-
checkpoint_hi_inclusive: 1,
616-
tx_hi: 3,
617-
timestamp_ms_hi_inclusive: 1000,
618-
},
619-
batch_rows: 1,
620-
total_rows: 1,
621-
}],
622-
};
623-
624-
// Send the batch
625-
setup.batch_tx.send(batch).await.unwrap();
626-
627-
// Wait for processing
628-
tokio::time::sleep(Duration::from_millis(200)).await;
629-
630-
// Verify data was committed
631-
{
632-
let data = setup.store.data.get(DataPipeline::NAME).unwrap();
633-
assert_eq!(data.get(&1).unwrap().value(), &vec![1, 2, 3]);
634-
}
635-
636-
// Verify no watermark was sent (skip_watermark mode)
637-
assert!(
638-
setup.watermark_rx.try_recv().is_err(),
639-
"No watermark should be sent in skip_watermark mode"
640-
);
641-
642-
// Clean up
643-
drop(setup.batch_tx);
644-
let _ = setup.committer_handle.await;
645-
}
646-
647591
#[tokio::test]
648592
async fn test_watermark_channel_closed() {
649-
let setup = setup_test(MockStore::default(), false).await;
593+
let setup = setup_test(MockStore::default()).await;
650594

651595
let batch = BatchedRows {
652596
values: vec![StoredData {

crates/sui-indexer-alt-framework/src/pipeline/concurrent/mod.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ impl Default for PrunerConfig {
177177
/// time.
178178
///
179179
/// The pipeline also maintains a row in the `watermarks` table for the pipeline which tracks the
180-
/// watermark below which all data has been committed (modulo pruning), as long as `skip_watermark`
181-
/// is not true.
180+
/// watermark below which all data has been committed (modulo pruning).
182181
///
183182
/// Checkpoint data is fed into the pipeline through the `checkpoint_rx` channel, and internal
184183
/// channels are created to communicate between its various components. The pipeline can be
@@ -188,7 +187,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
188187
handler: H,
189188
next_checkpoint: u64,
190189
config: ConcurrentConfig,
191-
skip_watermark: bool,
192190
store: H::Store,
193191
checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
194192
metrics: Arc<IndexerMetrics>,
@@ -236,7 +234,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
236234

237235
let committer = committer::<H>(
238236
committer_config.clone(),
239-
skip_watermark,
240237
committer_rx,
241238
committer_tx,
242239
store.clone(),
@@ -247,7 +244,6 @@ pub(crate) fn pipeline<H: Handler + Send + Sync + 'static>(
247244
let commit_watermark = commit_watermark::<H>(
248245
next_checkpoint,
249246
committer_config,
250-
skip_watermark,
251247
watermark_rx,
252248
store.clone(),
253249
metrics.clone(),
@@ -391,12 +387,10 @@ mod tests {
391387
let metrics = IndexerMetrics::new(None, &Registry::default());
392388
let cancel = CancellationToken::new();
393389

394-
let skip_watermark = false;
395390
let pipeline_handle = pipeline(
396391
DataPipeline,
397392
next_checkpoint,
398393
config,
399-
skip_watermark,
400394
store.clone(),
401395
checkpoint_rx,
402396
metrics,

crates/sui-indexer-alt-framework/src/pipeline/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,6 @@ const PIPELINE_BUFFER: usize = 5;
2121
/// happen if the pipeline was started with its initial checkpoint overridden to be strictly
2222
/// greater than its current watermark -- in that case, the pipeline will never be able to update
2323
/// its watermarks.
24-
///
25-
/// This may be a legitimate thing to do when backfilling a table, but in that case
26-
/// `--skip-watermarks` should be used.
2724
const WARN_PENDING_WATERMARKS: usize = 10000;
2825

2926
#[derive(Serialize, Deserialize, Debug, Clone)]

crates/sui-indexer-alt/src/benchmark.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ pub async fn run_benchmark(
5151
first_checkpoint: Some(first_checkpoint),
5252
last_checkpoint: Some(last_checkpoint),
5353
pipeline,
54-
..Default::default()
5554
};
5655

5756
let client_args = ClientArgs {

docs/content/guides/developer/advanced/custom-indexer/indexer-runtime-perf.mdx

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ keywords: [ sui indexer performance, indexer optimization, tokio console debuggi
66

77
Proper configuration and resource monitoring delivers the most performant custom indexer possible. For example:
88

9-
- Runtime configuration options for ingestion, database connections, and pipeline selection, as well as purposeful use of debugging tools like `tokio_console` help dial in your indexer performance.
9+
- Runtime configuration options for ingestion, database connections, and pipeline selection, as well as purposeful use of debugging tools like `tokio_console` help dial in your indexer performance.
1010

11-
- A sensible strategy targeting efficient data pruning for your tables keeps them performant over time.
11+
- A sensible strategy targeting efficient data pruning for your tables keeps them performant over time.
1212

13-
- Following best practices for exposing and extending Prometheus metrics helps you keep track of indexer performance.
13+
- Following best practices for exposing and extending Prometheus metrics helps you keep track of indexer performance.
1414

1515
Together, these techniques help you run indexers that are fast, resource-efficient, and easier to monitor in both development and production.
1616

@@ -26,10 +26,10 @@ Control how checkpoint data is fetched and distributed:
2626
let ingestion_config = IngestionConfig {
2727
// Buffer size across all downstream workers (default: 5000)
2828
checkpoint_buffer_size: 10000,
29-
29+
3030
// Concurrent checkpoint fetches (default: 200)
3131
ingest_concurrency: 500,
32-
32+
3333
// Retry interval for missing checkpoints in ms (default: 200)
3434
retry_interval_ms: 100,
3535
};
@@ -47,10 +47,10 @@ let ingestion_config = IngestionConfig {
4747
let db_args = DbArgs {
4848
// Connection pool size (default: 100)
4949
db_connection_pool_size: 200,
50-
50+
5151
// Connection timeout in ms (default: 60,000)
5252
db_connection_timeout_ms: 30000,
53-
53+
5454
// Statement timeout in ms (default: None)
5555
db_statement_timeout_ms: Some(120000),
5656
};
@@ -64,7 +64,7 @@ let db_args = DbArgs {
6464

6565
### Command-line arguments
6666

67-
Include the following command-line arguments to help focus processing. These values are for demonstration. Use values that make sense to your environment and goals.
67+
Include the following command-line arguments to help focus processing. These values are for demonstration. Use values that make sense to your environment and goals.
6868

6969
```sh
7070
# Checkpoint range control
@@ -74,9 +74,6 @@ Include the following command-line arguments to help focus processing. These val
7474
# Pipeline selection
7575
--pipeline "tx_counts" # Run specific pipeline only
7676
--pipeline "events" # Can specify multiple pipelines
77-
78-
# Watermark behavior
79-
--skip-watermark
8077
```
8178

8279
**Use cases:**
@@ -256,7 +253,7 @@ Pipelines with more complex pruning rules can still benefit. For example, in con
256253

257254
### Implementation
258255

259-
You can use `pg_partman` to simplify partition management. Configure a partitioned table with `create_parent`, followed by a cron job to periodically `run_maintenance`. You might need to iterate to determine the correct frequency of `run_maintenance`.
256+
You can use `pg_partman` to simplify partition management. Configure a partitioned table with `create_parent`, followed by a cron job to periodically `run_maintenance`. You might need to iterate to determine the correct frequency of `run_maintenance`.
260257

261258
:::info
262259

@@ -319,7 +316,7 @@ BEGIN
319316
-- Table is already managed
320317
result_message := 'EXISTS: Table ' || full_table_name || ' is already managed by pg_partman';
321318
END IF;
322-
319+
323320
RETURN result_message;
324321
END;
325322
$$ LANGUAGE plpgsql;
@@ -347,7 +344,7 @@ FROM part_config
347344
ORDER BY parent_table;
348345

349346
-- Check maintenance job is scheduled
350-
SELECT jobid, schedule, command, nodename, database, username
347+
SELECT jobid, schedule, command, nodename, database, username
351348
FROM cron.job
352349
WHERE command = 'SELECT run_maintenance()';
353350
```

0 commit comments

Comments
 (0)