diff --git a/Lite/Services/ArchiveService.cs b/Lite/Services/ArchiveService.cs index ed1100e..e0b24c1 100644 --- a/Lite/Services/ArchiveService.cs +++ b/Lite/Services/ArchiveService.cs @@ -252,6 +252,115 @@ state and will be disposed by the caller's `using` shortly. */ ["query_store_stats"] = ["query_plan_text"] }; + /* Maximum total on-disk parquet bytes per compaction merge batch. Wide-VARCHAR + tables (query_snapshots) expand 5-10x on read; this cap keeps the in-memory + working set during a COPY well below the 4 GB compaction memory_limit even + on the worst data shapes. Groups exceeding this budget produce multiple + _ptNNN.parquet output files. See #933 followup — a 72-file query_snapshots + backlog at 4 GB OOM'd on real allocation pressure during the final merge. */ + private const long MaxBatchInputBytes = 200L * 1024 * 1024; /* 200 MB */ + + /* Greedily group (smallest-first) into batches + whose total on-disk bytes don't exceed . A single + file larger than the cap becomes its own one-element batch — that's the + degenerate case (the cap can't split an individual file) and the caller + handles it as a single-file pass-through merge. */ + private static List> BuildSizeBudgetedBatches(IReadOnlyList sortedPaths, long maxBytes) + { + var batches = new List>(); + var current = new List(); + long currentBytes = 0; + + foreach (var p in sortedPaths) + { + var size = new FileInfo(p.Replace("/", "\\")).Length; + if (currentBytes + size > maxBytes && current.Count > 0) + { + batches.Add(current); + current = new List(); + currentBytes = 0; + } + current.Add(p); + currentBytes += size; + } + if (current.Count > 0) + { + batches.Add(current); + } + + return batches; + } + + /* Merge one size-budgeted batch into . The pragma + block matches the compaction tuning from #933: + - memory_limit = 4GB: parquet COPY does allocations that bypass the buffer + manager and can't be spilled. The cap is a hard ceiling for those, not + a spill trigger. 4GB leaves real headroom for wide-VARCHAR data within + the batch-size budget. Aligns with DuckDB's OOM guide (50-60% of RAM). + - threads = 2: fewer per-thread row-group buffers in flight. + - ROW_GROUP_SIZE 8192: smaller buffered batch per row group. + - preserve_insertion_order = false: lets DuckDB stream. + See tools/CompactionRepro for the stress reproducer. */ + private void MergeBatchToFile(string table, List sourcePaths, string outputPath, string spillDirSql) + { + if (sourcePaths.Count <= 2) + { + /* Small batch — single-pass merge (also covers the degenerate 1-file case). */ + using var con = new DuckDBConnection("DataSource=:memory:"); + con.Open(); + using (var pragma = con.CreateCommand()) + { + pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; + pragma.ExecuteNonQuery(); + } + + var selectClause = BuildSelectClause(table, sourcePaths); + var pathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'")); + using var cmd = con.CreateCommand(); + cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " + + $"TO '{EscapeSqlPath(outputPath)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)"; + cmd.ExecuteNonQuery(); + return; + } + + /* Larger batch — incremental pairwise merge. Caller has already sorted + smallest-first across the whole group; within a batch we preserve that + order so the accumulator grows steadily and small files are folded in + early when memory is cheapest. */ + var currentPath = sourcePaths[0]; + var intermediateFiles = new List(); + + for (var i = 1; i < sourcePaths.Count; i++) + { + var stepOutput = i < sourcePaths.Count - 1 + ? outputPath + $".step{i}.tmp" + : outputPath; + + using var con = new DuckDBConnection("DataSource=:memory:"); + con.Open(); + using (var pragma = con.CreateCommand()) + { + pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; + pragma.ExecuteNonQuery(); + } + + var selectClause = BuildSelectClause(table, new[] { currentPath, sourcePaths[i] }); + var pairList = $"'{EscapeSqlPath(currentPath)}', '{EscapeSqlPath(sourcePaths[i])}'"; + using var cmd = con.CreateCommand(); + cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " + + $"TO '{EscapeSqlPath(stepOutput)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)"; + cmd.ExecuteNonQuery(); + + if (intermediateFiles.Count > 0) + { + var prev = intermediateFiles[^1]; + try { File.Delete(prev); } catch { /* best effort */ } + } + intermediateFiles.Add(stepOutput); + currentPath = stepOutput; + } + } + /* Build the SELECT clause for a compaction COPY, excluding only the CompactionExcludeColumns actually present in THIS set of files. Detection must be per-merge-set, not global: archive files predating a @@ -376,6 +485,19 @@ private void CompactParquetFiles() } } + /* YYYYMM_tablename_ptNNN (multi-part monthly — must match before the + generic YYYYMM_tablename regex below, otherwise the trailing _ptNNN + gets captured as part of the table name and groups get split). */ + if (month == null) + { + m = Regex.Match(name, @"^(\d{6})_(.+)_pt\d{3}$"); + if (m.Success) + { + month = m.Groups[1].Value; + table = m.Groups[2].Value; + } + } + /* YYYYMM_tablename (already monthly — our target format) */ if (month == null) { @@ -423,7 +545,8 @@ parquet files already live on. */ foreach (var ((month, table), files) in groups) { - /* If there's exactly one file and it's already in monthly format, skip */ + /* If there's exactly one file and it's already in monthly format, skip. + This regex matches both YYYYMM_table.parquet and YYYYMM_table_ptNNN.parquet. */ if (files.Count == 1) { var name = Path.GetFileNameWithoutExtension(files[0]); @@ -438,93 +561,50 @@ parquet files already live on. */ ? DateTime.UtcNow.ToString("yyyyMM") : month; - var targetFile = $"{targetMonth}_{table}.parquet"; - var targetPath = Path.Combine(_archivePath, targetFile).Replace("\\", "/"); - var tempPath = targetPath + ".tmp"; - try { var sourcePaths = files .Select(f => Path.Combine(_archivePath, f).Replace("\\", "/")) .ToList(); - if (sourcePaths.Count <= 2) - { - /* Small group — single-pass merge. - - Pragma tuning (history per #933): - - memory_limit = 4GB: parquet COPY does allocations that - bypass the buffer manager and can't be spilled. The cap - is effectively a hard ceiling for those, not a spill - trigger. At 1GB (the prior value) the reproducer dies - at ~900/953 MiB used before any rows are read. 4GB - leaves enough headroom for query_snapshots-shaped data - (wide VARCHAR plan XML) and aligns with DuckDB's OOM - guide recommendation of 50-60% of system RAM. - - threads = 2: fewer per-thread row-group buffers in flight. - - ROW_GROUP_SIZE 8192: smaller buffered batch per group. - - preserve_insertion_order = false: lets DuckDB stream. - See tools/CompactionRepro for the stress reproducer. */ - using var con = new DuckDBConnection("DataSource=:memory:"); - con.Open(); - using (var pragma = con.CreateCommand()) - { - pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; - pragma.ExecuteNonQuery(); - } + /* Sort smallest-first so size-budget batches fill cheaply at first. */ + var sorted = sourcePaths + .OrderBy(p => new FileInfo(p.Replace("/", "\\")).Length) + .ToList(); - var selectClause = BuildSelectClause(table, sourcePaths); - var pathList = string.Join(", ", sourcePaths.Select(p => $"'{EscapeSqlPath(p)}'")); - using var cmd = con.CreateCommand(); - cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pathList}], union_by_name=true)) " + - $"TO '{EscapeSqlPath(tempPath)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)"; - cmd.ExecuteNonQuery(); - } - else + /* Bucket files into size-budgeted batches. Cap each batch's on-disk + parquet bytes so a single COPY doesn't try to merge an unbounded + amount of expanded VARCHAR data. Wide-row tables (query_snapshots' + plan XML) expand ~5-10x in memory on read; a 72-file backlog at + the 4 GB compaction memory_limit OOM'd on real allocation pressure + (not pre-reservation) — see #933 followup. The cap is sized so + that even with ~10x expansion the in-memory load stays well under + 4 GB. Narrow tables fit one batch with hundreds of files in it. */ + var batches = BuildSizeBudgetedBatches(sorted, MaxBatchInputBytes); + + /* Plan the output names. With one batch we keep the existing + YYYYMM_table.parquet name (backward compatible). With multiple + batches we emit YYYYMM_table_ptNNN.parquet — the archive views + already glob "*_table.parquet" so readers see them all. */ + var batchOutputs = new List<(string TempPath, string FinalPath)>(); + for (var i = 0; i < batches.Count; i++) { - /* Large group — incremental merge (pairs) to keep peak memory low. - Sort smallest-first so early merges are cheap. */ - var sorted = sourcePaths - .OrderBy(p => new FileInfo(p.Replace("/", "\\")).Length) - .ToList(); - - var currentPath = sorted[0]; - var intermediateFiles = new List(); - - for (var i = 1; i < sorted.Count; i++) - { - var stepOutput = i < sorted.Count - 1 - ? targetPath + $".step{i}.tmp" - : tempPath; - - using var con = new DuckDBConnection("DataSource=:memory:"); - con.Open(); - using (var pragma = con.CreateCommand()) - { - pragma.CommandText = $"SET memory_limit = '4GB'; SET threads = 2; SET preserve_insertion_order = false; SET temp_directory = '{EscapeSqlPath(spillDirSql)}';"; - pragma.ExecuteNonQuery(); - } - - var selectClause = BuildSelectClause(table, new[] { currentPath, sorted[i] }); - var pairList = $"'{EscapeSqlPath(currentPath)}', '{EscapeSqlPath(sorted[i])}'"; - using var cmd = con.CreateCommand(); - cmd.CommandText = $"COPY (SELECT {selectClause} FROM read_parquet([{pairList}], union_by_name=true)) " + - $"TO '{EscapeSqlPath(stepOutput)}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 8192)"; - cmd.ExecuteNonQuery(); - - /* Clean up previous intermediate file */ - if (intermediateFiles.Count > 0) - { - var prev = intermediateFiles[^1]; - try { File.Delete(prev); } catch { /* best effort */ } - } + var finalName = batches.Count == 1 + ? $"{targetMonth}_{table}.parquet" + : $"{targetMonth}_{table}_pt{i + 1:D3}.parquet"; + var finalPath = Path.Combine(_archivePath, finalName).Replace("\\", "/"); + batchOutputs.Add((TempPath: finalPath + ".tmp", FinalPath: finalPath)); + } - intermediateFiles.Add(stepOutput); - currentPath = stepOutput; - } + /* Run each batch's merge into its temp file. If any batch throws, + the catch below cleans up all temps and we leave the originals in + place for next cycle's retry. */ + for (var i = 0; i < batches.Count; i++) + { + MergeBatchToFile(table, batches[i], batchOutputs[i].TempPath, spillDirSql); } - /* Remove originals */ + /* All batches succeeded — delete originals, promote temps. */ var removed = 0; foreach (var f in files) { @@ -540,28 +620,34 @@ Sort smallest-first so early merges are cheap. */ } } - /* Rename temp to final */ - if (File.Exists(targetPath)) + foreach (var (tempPath, finalPath) in batchOutputs) { - File.Delete(targetPath); + if (File.Exists(finalPath)) + { + File.Delete(finalPath); + } + File.Move(tempPath, finalPath); } - File.Move(tempPath, targetPath); totalMerged++; totalRemoved += removed; - _logger?.LogDebug("Compacted {Count} files into {Target}", files.Count, targetFile); + if (batches.Count == 1) + { + _logger?.LogDebug("Compacted {Count} files into {Target}", files.Count, batchOutputs[0].FinalPath); + } + else + { + _logger?.LogInformation("Compacted {Count} files into {Parts} part files for {Month}/{Table} (input too large for single batch)", + files.Count, batches.Count, targetMonth, table); + } } catch (Exception ex) { _logger?.LogError(ex, "Failed to compact {Month}/{Table} ({Count} files)", month, table, files.Count); - /* Clean up temp and intermediate files on failure */ - if (File.Exists(tempPath)) - { - try { File.Delete(tempPath); } catch { /* best effort */ } - } - foreach (var stepFile in Directory.GetFiles(_archivePath, $"{targetMonth}_{table}.parquet.step*.tmp")) + /* Best-effort cleanup of any temp/intermediate files. */ + foreach (var stepFile in Directory.GetFiles(_archivePath, $"{targetMonth}_{table}*.tmp")) { try { File.Delete(stepFile); } catch { /* best effort */ } }