Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sdk/nb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ interface DBCollection {

validate(doc: object, warn?: 'warn'): object;

executeSQL<T>(query: string, params: Array<any>, options?: { query_name?: string }): Promise<sqlResult<T>>;
executeSQL<T>(query: string, params: Array<any>, options?: { query_name?: string, preferred_pool?: string }): Promise<sqlResult<T>>;
name: any;
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/bg_services/db_cleaner.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,19 @@ async function clean_md_store(last_date_to_remove) {
}
dbg.log0('DB_CLEANER: checking md-store for documents deleted before', new Date(last_date_to_remove));
const objects_to_remove = await MDStore.instance().find_deleted_objects(last_date_to_remove, config.DB_CLEANER_DOCS_LIMIT);
dbg.log2('DB_CLEANER: list objects:', objects_to_remove);
dbg.log0('DB_CLEANER: list objects:', objects_to_remove);
if (objects_to_remove.length) {
await P.map_with_concurrency(10, objects_to_remove, obj => db_delete_object_parts(obj));
await MDStore.instance().db_delete_objects(objects_to_remove);
}
const blocks_to_remove = await MDStore.instance().find_deleted_blocks(last_date_to_remove, config.DB_CLEANER_DOCS_LIMIT);
dbg.log2('DB_CLEANER: list blocks:', blocks_to_remove);
dbg.log0('DB_CLEANER: list blocks:', blocks_to_remove);
if (blocks_to_remove.length) await MDStore.instance().db_delete_blocks(blocks_to_remove);
const chunks_to_remove = await MDStore.instance().find_deleted_chunks(last_date_to_remove, config.DB_CLEANER_DOCS_LIMIT);
const filtered_chunks = chunks_to_remove.filter(async chunk =>
!(await MDStore.instance().has_any_blocks_for_chunk(chunk)) &&
!(await MDStore.instance().has_any_parts_for_chunk(chunk)));
dbg.log2('DB_CLEANER: list chunks with no blocks and no parts to be removed from DB', filtered_chunks);
dbg.log0('DB_CLEANER: list chunks with no blocks and no parts to be removed from DB', filtered_chunks);
if (filtered_chunks.length) await MDStore.instance().db_delete_chunks(filtered_chunks);
dbg.log0(`DB_CLEANER: removed ${objects_to_remove.length + blocks_to_remove.length + filtered_chunks.length} documents from md-store`);
}
Expand Down
14 changes: 11 additions & 3 deletions src/server/object_services/md_store.js
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ class MDStore {
}, {
limit: Math.min(limit, 1000),
hint: 'deleted_unreclaimed_index',
preferred_pool: 'read_only',
});
return results;
}
Expand Down Expand Up @@ -1123,7 +1124,7 @@ class MDStore {
FROM ${this._objects.name}
WHERE (to_ts(data->>'deleted')<to_ts($1) and data ? 'deleted' and data ? 'reclaimed')
LIMIT ${query_limit};`;
const result = await this._objects.executeSQL(query, [new Date(max_delete_time).toISOString()]);
const result = await this._objects.executeSQL(query, [new Date(max_delete_time).toISOString()], {preferred_pool: 'read_only'});
return db_client.instance().uniq_ids(result.rows, '_id');
}

Expand Down Expand Up @@ -1599,6 +1600,7 @@ class MDStore {
_id: -1
},
limit: limit,
preferred_pool: 'read_only',
})

.then(chunks => ({
Expand Down Expand Up @@ -1773,21 +1775,26 @@ class MDStore {
projection: {
_id: 1,
deleted: 1
}
},
preferred_pool: 'read_only'
})
.then(objects => db_client.instance().uniq_ids(objects, '_id'));
}

has_any_blocks_for_chunk(chunk_id) {
return this._blocks.findOne({
chunk: { $eq: chunk_id, $exists: true },
}, {
preferred_pool: 'read_only'
})
.then(obj => Boolean(obj));
}

has_any_parts_for_chunk(chunk_id) {
return this._parts.findOne({
chunk: { $eq: chunk_id, $exists: true },
}, {
preferred_pool: 'read_only'
})
.then(obj => Boolean(obj));
}
Expand Down Expand Up @@ -2029,7 +2036,8 @@ class MDStore {
projection: {
_id: 1,
deleted: 1
}
},
preferred_pool: 'read_only'
})
.then(objects => db_client.instance().uniq_ids(objects, '_id'));
}
Expand Down
14 changes: 11 additions & 3 deletions src/util/postgres_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ function convert_timestamps(where_clause) {

async function _do_query(pg_client, q, transaction_counter) {
query_counter += 1;

dbg.log3("pg_client.options?.host =", pg_client.options?.host, ", q =", q);

const tag = `T${_.padStart(transaction_counter, 8, '0')}|Q${_.padStart(query_counter.toString(), 8, '0')}`;
try {
// dbg.log0(`postgres_client: ${tag}: ${q.text}`, util.inspect(q.values, { depth: 6 }));
Expand Down Expand Up @@ -629,6 +632,10 @@ class PostgresTable {
get_pool(key = this.pool_key) {
const pool = this.client.get_pool(key);
if (!pool) {
//if original get_pool was no for the default this.pool_key, try also this.pool_key
if (key && key !== this.pool_key) {
return this.get_pool();
}
throw new Error(`The postgres clients pool ${key} disconnected`);
}
return pool;
Expand Down Expand Up @@ -716,13 +723,14 @@ class PostgresTable {
* @param {Array<any>} params
* @param {{
* query_name?: string,
* preferred_pool?: string,
* }} [options = {}]
*
* @returns {Promise<import('pg').QueryResult<T>>}
*/
async executeSQL(query, params, options = {}) {
/** @type {Pool} */
const pool = this.get_pool();
const pool = this.get_pool(options.preferred_pool);
const client = await pool.connect();

const q = {
Expand Down Expand Up @@ -926,7 +934,7 @@ class PostgresTable {
query_string += ` OFFSET ${sql_query.offset}`;
}
try {
const res = await this.single_query(query_string);
const res = await this.single_query(query_string, undefined, this.get_pool(options.preferred_pool));
return res.rows.map(row => decode_json(this.schema, row.data));
} catch (err) {
dbg.error('find failed', query, options, query_string, err);
Expand All @@ -943,7 +951,7 @@ class PostgresTable {
}
query_string += ' LIMIT 1';
try {
const res = await this.single_query(query_string);
const res = await this.single_query(query_string, undefined, this.get_pool(options.preferred_pool));
if (res.rowCount === 0) return null;
return res.rows.map(row => decode_json(this.schema, row.data))[0];
} catch (err) {
Expand Down