From 92a38fb3876143fb419eac91c3c693d2682a2037 Mon Sep 17 00:00:00 2001 From: Alisue Date: Sun, 6 Apr 2025 10:43:32 +0900 Subject: [PATCH] feat: Add chunk interval to collect and match processors Close #32 --- denops/fall/processor/collect.ts | 11 ++++++++++- denops/fall/processor/match.ts | 11 ++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/denops/fall/processor/collect.ts b/denops/fall/processor/collect.ts index 7044799..9fba156 100644 --- a/denops/fall/processor/collect.ts +++ b/denops/fall/processor/collect.ts @@ -9,10 +9,12 @@ import { dispatch } from "../event.ts"; const THRESHOLD = 10000; const CHUNK_SIZE = 1000; +const CHUNK_INTERVAL = 100; export type CollectProcessorOptions = { threshold?: number; chunkSize?: number; + chunkInterval?: number; }; export class CollectProcessor implements Disposable { @@ -21,6 +23,7 @@ export class CollectProcessor implements Disposable { readonly #source: Source; readonly #threshold: number; readonly #chunkSize: number; + readonly #chunkInterval: number; #processing?: Promise; #paused?: PromiseWithResolvers; @@ -31,6 +34,7 @@ export class CollectProcessor implements Disposable { this.#source = source; this.#threshold = options.threshold ?? THRESHOLD; this.#chunkSize = options.chunkSize ?? CHUNK_SIZE; + this.#chunkInterval = options.chunkInterval ?? CHUNK_INTERVAL; } get items() { @@ -72,10 +76,15 @@ export class CollectProcessor implements Disposable { dispatch({ type: "collect-processor-updated" }); }; const chunker = new Chunker>(this.#chunkSize); + let lastChunkTime = performance.now(); for await (const item of iter) { if (this.#paused) await this.#paused.promise; signal.throwIfAborted(); - if (chunker.put(item)) { + if ( + chunker.put(item) || + performance.now() - lastChunkTime > this.#chunkInterval + ) { + lastChunkTime = performance.now(); update(chunker.consume()); } } diff --git a/denops/fall/processor/match.ts b/denops/fall/processor/match.ts index 8509e6a..3e632d5 100644 --- a/denops/fall/processor/match.ts +++ b/denops/fall/processor/match.ts @@ -11,11 +11,13 @@ import { dispatch } from "../event.ts"; const INTERVAL = 0; const THRESHOLD = 10000; const CHUNK_SIZE = 1000; +const CHUNK_INTERVAL = 100; export type MatchProcessorOptions = { interval?: number; threshold?: number; chunkSize?: number; + chunkInterval?: number; incremental?: boolean; }; @@ -24,6 +26,7 @@ export class MatchProcessor implements Disposable { readonly #interval: number; readonly #threshold: number; readonly #chunkSize: number; + readonly #chunkInterval: number; readonly #incremental: boolean; #controller: AbortController = new AbortController(); #processing?: Promise; @@ -38,6 +41,7 @@ export class MatchProcessor implements Disposable { this.#interval = options.interval ?? INTERVAL; this.#threshold = options.threshold ?? THRESHOLD; this.#chunkSize = options.chunkSize ?? CHUNK_SIZE; + this.#chunkInterval = options.chunkInterval ?? CHUNK_INTERVAL; this.#incremental = options.incremental ?? false; } @@ -109,9 +113,14 @@ export class MatchProcessor implements Disposable { } }; const chunker = new Chunker>(this.#chunkSize); + let lastChunkTime = performance.now(); for await (const item of iter) { signal.throwIfAborted(); - if (chunker.put(item)) { + if ( + chunker.put(item) || + performance.now() - lastChunkTime > this.#chunkInterval + ) { + lastChunkTime = performance.now(); update(chunker.consume()); await delay(this.#interval, { signal }); }