Skip to content

Commit d817375

Browse files
committed
Fix SPI stack leak and tables_ map corruption during DDL WAL replay
1 parent c12c53e commit d817375

File tree

1 file changed

+23
-1
lines changed

1 file changed

+23
-1
lines changed

cpp/deeplake_pg/table_storage.cpp

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ extern "C" {
4646
#include <icm/json.hpp>
4747
#include <icm/string_map.hpp>
4848
#include <nd/none.hpp>
49+
#include <unordered_set>
4950

5051
#include <algorithm>
5152
#include <vector>
@@ -287,13 +288,20 @@ void table_storage::load_table_metadata()
287288
continue;
288289
}
289290

291+
// Snapshot tables_ keys so we can roll back C++ state on failure
292+
std::vector<Oid> tables_before;
293+
tables_before.reserve(tables_.size());
294+
for (const auto& [oid, _] : tables_) {
295+
tables_before.push_back(oid);
296+
}
297+
290298
MemoryContext saved_context = CurrentMemoryContext;
291299
ResourceOwner saved_owner = CurrentResourceOwner;
292300
BeginInternalSubTransaction(nullptr);
293301
PG_TRY();
294302
{
295303
set_catalog_only_create(true);
296-
pg::utils::spi_connector connector;
304+
SPI_connect();
297305
bool pushed_snapshot = false;
298306
if (!ActiveSnapshotSet()) {
299307
PushActiveSnapshot(GetTransactionSnapshot());
@@ -328,6 +336,7 @@ void table_storage::load_table_metadata()
328336
if (pushed_snapshot) {
329337
PopActiveSnapshot();
330338
}
339+
SPI_finish();
331340
set_catalog_only_create(false);
332341
ReleaseCurrentSubTransaction();
333342
}
@@ -339,6 +348,19 @@ void table_storage::load_table_metadata()
339348
CurrentResourceOwner = saved_owner;
340349
RollbackAndReleaseCurrentSubTransaction();
341350
FlushErrorState();
351+
352+
// Remove any tables_ entries added during the failed replay,
353+
// since the subtransaction rollback undid the catalog changes
354+
// but the C++ map entries persist.
355+
std::unordered_set<Oid> before_set(tables_before.begin(), tables_before.end());
356+
for (auto it = tables_.begin(); it != tables_.end(); ) {
357+
if (!before_set.contains(it->first)) {
358+
it = tables_.erase(it);
359+
} else {
360+
++it;
361+
}
362+
}
363+
342364
elog(WARNING, "pg_deeplake: DDL WAL replay failed (seq=%ld, tag=%s): %s (SQL: %.200s)",
343365
entry.seq, entry.command_tag.c_str(),
344366
edata->message ? edata->message : "unknown error",

0 commit comments

Comments
 (0)