Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 174 additions & 88 deletions Lite/Services/ArchiveService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,115 @@ state and will be disposed by the caller's `using` shortly. */
["query_store_stats"] = ["query_plan_text"]
};

/* Maximum total on-disk parquet bytes per compaction merge batch. Wide-VARCHAR
tables (query_snapshots) expand 5-10x on read; this cap keeps the in-memory
working set during a COPY well below the 4 GB compaction memory_limit even
on the worst data shapes. Groups exceeding this budget produce multiple
_ptNNN.parquet output files. See #933 followup — a 72-file query_snapshots
backlog at 4 GB OOM'd on real allocation pressure during the final merge. */
private const long MaxBatchInputBytes = 200L * 1024 * 1024; /* 200 MB */

/* Greedily group <paramref name="sortedPaths"/> (smallest-first) into batches
whose total on-disk bytes don't exceed <paramref name="maxBytes"/>. A single
file larger than the cap becomes its own one-element batch — that's the
degenerate case (the cap can't split an individual file) and the caller
handles it as a single-file pass-through merge. */
private static List<List<string>> BuildSizeBudgetedBatches(IReadOnlyList<string> sortedPaths, long maxBytes)
{
var batches = new List<List<string>>();
var current = new List<string>();
long currentBytes = 0;

foreach (var p in sortedPaths)
{
var size = new FileInfo(p.Replace("/", "\\")).Length;
if (currentBytes + size > maxBytes && current.Count > 0)
{
batches.Add(current);
current = new List<string>();
currentBytes = 0;
}
current.Add(p);
currentBytes += size;
}
if (current.Count > 0)
{
batches.Add(current);
}

return batches;
}

/* Merge one size-budgeted batch into <paramref name="outputPath"/>. The pragma
block matches the compaction tuning from #933:
- memory_limit = 4GB: parquet COPY does allocations that bypass the buffer
manager and can't be spilled. The cap is a hard ceiling for those, not
a spill trigger. 4GB leaves real headroom for wide-VARCHAR data within
the batch-size budget. Aligns with DuckDB's OOM guide (50-60% of RAM).
- threads = 2: fewer per-thread row-group buffers in flight.
- ROW_GROUP_SIZE 8192: smaller buffered batch per row group.
- preserve_insertion_order = false: lets DuckDB stream.
See tools/CompactionRepro for the stress reproducer. */
private void MergeBatchToFile(string table, List<string> sourcePaths, string outputPath, string spillDirSql)
{
if (sourcePaths.Count <= 2)
{
/* Small batch — single-pass merge (also covers the degenerate 1-file case). */
using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();
using (var pragma = con.CreateCommand())
{
pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';";
pragma.ExecuteNonQuery();
}

var selectClause = BuildSelectClause(table, sourcePaths);
var pathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'"));
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " +
$"TO '{EscapeSqlPath(outputPath)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)";
cmd.ExecuteNonQuery();
return;
}

/* Larger batch — incremental pairwise merge. Caller has already sorted
smallest-first across the whole group; within a batch we preserve that
order so the accumulator grows steadily and small files are folded in
early when memory is cheapest. */
var currentPath = sourcePaths[0];
var intermediateFiles = new List<string>();

for (var i = 1; i < sourcePaths.Count; i++)
{
var stepOutput = i < sourcePaths.Count - 1
? outputPath + $".step{i}.tmp"
: outputPath;

using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();
using (var pragma = con.CreateCommand())
{
pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';";
pragma.ExecuteNonQuery();
}

var selectClause = BuildSelectClause(table, new[] { currentPath, sourcePaths[i] });
var pairList = $"'{EscapeSqlPath(currentPath)}', '{EscapeSqlPath(sourcePaths[i])}'";
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " +
$"TO '{EscapeSqlPath(stepOutput)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)";
cmd.ExecuteNonQuery();

if (intermediateFiles.Count > 0)
{
var prev = intermediateFiles[^1];
try { File.Delete(prev); } catch { /* best effort */ }
}
intermediateFiles.Add(stepOutput);
currentPath = stepOutput;
}
}

/* Build the SELECT clause for a compaction COPY, excluding only the
CompactionExcludeColumns actually present in THIS set of files.
Detection must be per-merge-set, not global: archive files predating a
Expand Down Expand Up @@ -376,6 +485,19 @@ private void CompactParquetFiles()
}
}

/* YYYYMM_tablename_ptNNN (multi-part monthly — must match before the
generic YYYYMM_tablename regex below, otherwise the trailing _ptNNN
gets captured as part of the table name and groups get split). */
if (month == null)
{
m = Regex.Match(name, @"^(\d{6})_(.+)_pt\d{3}$");
if (m.Success)
{
month = m.Groups[1].Value;
table = m.Groups[2].Value;
}
}

/* YYYYMM_tablename (already monthly — our target format) */
if (month == null)
{
Expand Down Expand Up @@ -423,7 +545,8 @@ parquet files already live on. */

foreach (var ((month, table), files) in groups)
{
/* If there's exactly one file and it's already in monthly format, skip */
/* If there's exactly one file and it's already in monthly format, skip.
This regex matches both YYYYMM_table.parquet and YYYYMM_table_ptNNN.parquet. */
if (files.Count == 1)
{
var name = Path.GetFileNameWithoutExtension(files[0]);
Expand All @@ -438,93 +561,50 @@ parquet files already live on. */
? DateTime.UtcNow.ToString("yyyyMM")
: month;

var targetFile = $"{targetMonth}_{table}.parquet";
var targetPath = Path.Combine(_archivePath, targetFile).Replace("\\", "/");
var tempPath = targetPath + ".tmp";

try
{
var sourcePaths = files
.Select(f => Path.Combine(_archivePath, f).Replace("\\", "/"))
.ToList();

if (sourcePaths.Count <= 2)
{
/* Small group — single-pass merge.

Pragma tuning (history per #933):
- memory_limit = 4GB: parquet COPY does allocations that
bypass the buffer manager and can't be spilled. The cap
is effectively a hard ceiling for those, not a spill
trigger. At 1GB (the prior value) the reproducer dies
at ~900/953 MiB used before any rows are read. 4GB
leaves enough headroom for query_snapshots-shaped data
(wide VARCHAR plan XML) and aligns with DuckDB's OOM
guide recommendation of 50-60% of system RAM.
- threads = 2: fewer per-thread row-group buffers in flight.
- ROW_GROUP_SIZE 8192: smaller buffered batch per group.
- preserve_insertion_order = false: lets DuckDB stream.
See tools/CompactionRepro for the stress reproducer. */
using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();
using (var pragma = con.CreateCommand())
{
pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';";
pragma.ExecuteNonQuery();
}
/* Sort smallest-first so size-budget batches fill cheaply at first. */
var sorted = sourcePaths
.OrderBy(p => new FileInfo(p.Replace("/", "\\")).Length)
.ToList();

var selectClause = BuildSelectClause(table, sourcePaths);
var pathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'"));
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " +
$"TO '{EscapeSqlPath(tempPath)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)";
cmd.ExecuteNonQuery();
}
else
/* Bucket files into size-budgeted batches. Cap each batch's on-disk
parquet bytes so a single COPY doesn't try to merge an unbounded
amount of expanded VARCHAR data. Wide-row tables (query_snapshots'
plan XML) expand ~5-10x in memory on read; a 72-file backlog at
the 4 GB compaction memory_limit OOM'd on real allocation pressure
(not pre-reservation) — see #933 followup. The cap is sized so
that even with ~10x expansion the in-memory load stays well under
4 GB. Narrow tables fit one batch with hundreds of files in it. */
var batches = BuildSizeBudgetedBatches(sorted, MaxBatchInputBytes);

/* Plan the output names. With one batch we keep the existing
YYYYMM_table.parquet name (backward compatible). With multiple
batches we emit YYYYMM_table_ptNNN.parquet — the archive views
already glob "*_table.parquet" so readers see them all. */
var batchOutputs = new List<(string TempPath, string FinalPath)>();
for (var i = 0; i < batches.Count; i++)
{
/* Large group — incremental merge (pairs) to keep peak memory low.
Sort smallest-first so early merges are cheap. */
var sorted = sourcePaths
.OrderBy(p => new FileInfo(p.Replace("/", "\\")).Length)
.ToList();

var currentPath = sorted[0];
var intermediateFiles = new List<string>();

for (var i = 1; i < sorted.Count; i++)
{
var stepOutput = i < sorted.Count - 1
? targetPath + $".step{i}.tmp"
: tempPath;

using var con = new DuckDBConnection("DataSource=:memory:");
con.Open();
using (var pragma = con.CreateCommand())
{
pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';";
pragma.ExecuteNonQuery();
}

var selectClause = BuildSelectClause(table, new[] { currentPath, sorted[i] });
var pairList = $"'{EscapeSqlPath(currentPath)}', '{EscapeSqlPath(sorted[i])}'";
using var cmd = con.CreateCommand();
cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " +
$"TO '{EscapeSqlPath(stepOutput)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)";
cmd.ExecuteNonQuery();

/* Clean up previous intermediate file */
if (intermediateFiles.Count > 0)
{
var prev = intermediateFiles[^1];
try { File.Delete(prev); } catch { /* best effort */ }
}
var finalName = batches.Count == 1
? $"{targetMonth}_{table}.parquet"
: $"{targetMonth}_{table}_pt{i + 1:D3}.parquet";
var finalPath = Path.Combine(_archivePath, finalName).Replace("\\", "/");
batchOutputs.Add((TempPath: finalPath + ".tmp", FinalPath: finalPath));
}

intermediateFiles.Add(stepOutput);
currentPath = stepOutput;
}
/* Run each batch's merge into its temp file. If any batch throws,
the catch below cleans up all temps and we leave the originals in
place for next cycle's retry. */
for (var i = 0; i < batches.Count; i++)
{
MergeBatchToFile(table, batches[i], batchOutputs[i].TempPath, spillDirSql);
}

/* Remove originals */
/* All batches succeeded — delete originals, promote temps. */
var removed = 0;
foreach (var f in files)
{
Expand All @@ -540,28 +620,34 @@ Sort smallest-first so early merges are cheap. */
}
}

/* Rename temp to final */
if (File.Exists(targetPath))
foreach (var (tempPath, finalPath) in batchOutputs)
{
File.Delete(targetPath);
if (File.Exists(finalPath))
{
File.Delete(finalPath);
}
File.Move(tempPath, finalPath);
}
File.Move(tempPath, targetPath);

totalMerged++;
totalRemoved += removed;

_logger?.LogDebug("Compacted {Count} files into {Target}", files.Count, targetFile);
if (batches.Count == 1)
{
_logger?.LogDebug("Compacted {Count} files into {Target}", files.Count, batchOutputs[0].FinalPath);
}
else
{
_logger?.LogInformation("Compacted {Count} files into {Parts} part files for {Month}/{Table} (input too large for single batch)",
files.Count, batches.Count, targetMonth, table);
}
}
catch (Exception ex)
{
_logger?.LogError(ex, "Failed to compact {Month}/{Table} ({Count} files)", month, table, files.Count);

/* Clean up temp and intermediate files on failure */
if (File.Exists(tempPath))
{
try { File.Delete(tempPath); } catch { /* best effort */ }
}
foreach (var stepFile in Directory.GetFiles(_archivePath, $"{targetMonth}_{table}.parquet.step*.tmp"))
/* Best-effort cleanup of any temp/intermediate files. */
foreach (var stepFile in Directory.GetFiles(_archivePath, $"{targetMonth}_{table}*.tmp"))
{
try { File.Delete(stepFile); } catch { /* best effort */ }
}
Expand Down
Loading