Skip to content

Commit

Permalink
[fix] Added support of ssl connection to MySql database
Browse files Browse the repository at this point in the history
  • Loading branch information
Georgii Petrov committed Jun 3, 2024
1 parent b32dcf5 commit fda1427
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 98 deletions.
7 changes: 7 additions & 0 deletions Common/config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,13 @@
"pool": {
"idleTimeoutMillis": 30000
}
},
"mysqlExtraOptions": {
"ssl": {
"key": "your/path/to/client-key.pem",
"cert": "your/path/to/client-cert.pem",
"ca": "your/path/to/client-ca.pem"
}
}
},
"redis": {
Expand Down
228 changes: 130 additions & 98 deletions DocService/sources/databaseConnectors/mysqlConnector.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@

'use strict';

const mysql = require('mysql2');
const mysql = require('mysql2/promise');
const connectorUtilities = require('./connectorUtilities');
const config = require('config');
const fs = require('fs/promises');

const configSql = config.get('services.CoAuthoring.sql');
const cfgTableResult = config.get('services.CoAuthoring.sql.tableResult');
const cfgTableResult = configSql.get('tableResult');

const pool = mysql.createPool({
const connectionConfiguration = {
host : configSql.get('dbHost'),
port : parseInt(configSql.get('dbPort')),
user : configSql.get('dbUser'),
Expand All @@ -49,120 +50,151 @@ const pool = mysql.createPool({
connectionLimit : configSql.get('connectionlimit'),
timezone : 'Z',
flags : '-FOUND_ROWS'
});
};

const additionalOptions = configSql.get('mysqlExtraOptions');
const configuration = Object.assign({}, connectionConfiguration, additionalOptions);
if (configuration.ssl?.key === 'your/path/to/client-key.pem') {
delete configuration.ssl;
}

let pool = null;

function sqlQuery(ctx, sqlCommand, callbackFunction, opt_noModifyRes = false, opt_noLog = false, opt_values = []) {
pool.getConnection(function(connectionError, connection) {
if (connectionError) {
if (!opt_noLog) {
ctx.logger.error('pool.getConnection error: %s', connectionError);
}
return executeQuery(ctx, sqlCommand, opt_values, opt_noModifyRes, opt_noLog).then(
result => callbackFunction?.(null, result),
error => callbackFunction?.(error)
);
}

callbackFunction?.(connectionError, null);
async function executeQuery(ctx, sqlCommand, values = [], noModifyRes = false, noLog = false) {
let connection = null;
try {
if (!pool) {
if (configuration.ssl) {
const { key, cert, ca } = configuration.ssl;
const requirementsFulfilled = key && cert && ca;

if (requirementsFulfilled) {
try {
const keyBuffer = await fs.readFile(key);
const certBuffer = await fs.readFile(cert);
const caBuffer = await fs.readFile(ca);
configuration.ssl = {
key: keyBuffer,
cert: certBuffer,
ca: caBuffer
};
} catch (error) {
delete configuration.ssl;
ctx.logger.error(`sqlQuery() SSL connection cannot be created because error occurred during reading .pem files, switching to default connection: \n${error.stack}`);
}
} else {
delete configuration.ssl;
ctx.logger.error('sqlQuery() SSL connection cannot be created because some fields are missing(needs: "key", "cert", "ca"), switching to default connection');
}
}

return;
pool = mysql.createPool(configuration);
}

let queryCallback = function (error, result) {
connection.release();
if (error && !opt_noLog) {
ctx.logger.error('_______________________error______________________');
ctx.logger.error('sqlQuery: %s sqlCommand: %s', error.code, sqlCommand);
ctx.logger.error(error);
ctx.logger.error('_____________________end_error____________________');
}
connection = await pool.getConnection();

let output;
if (!opt_noModifyRes) {
output = result?.affectedRows ? { affectedRows: result.affectedRows } : result;
} else {
output = result;
}
const result = await connection.query(sqlCommand, values);

output = output ?? { rows: [], affectedRows: 0 };
let output;
if (!noModifyRes) {
output = result[0]?.affectedRows ? { affectedRows: result[0].affectedRows } : result[0];
} else {
output = result[0];
}

callbackFunction?.(error, output);
};
return output ?? { rows: [], affectedRows: 0 };
} catch (error) {
if (!noLog) {
ctx.logger.error(`sqlQuery() error while executing query: ${sqlCommand}\n${error.stack}`);
}

connection.query(sqlCommand, opt_values, queryCallback);
});
throw error;
} finally {
if (connection) {
try {
// Put the connection back in the pool
connection.release();
} catch (error) {
if (!noLog) {
ctx.logger.error(`connection.release() error while executing query: ${sqlCommand}\n${error.stack}`);
}
}
}
}
}

function closePool() {
return new Promise((resolve, reject) => {
pool.end((error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
return pool?.end();
}

function addSqlParameter(val, values) {
values.push(val);
function addSqlParameter(parameter, accumulatedArray) {
accumulatedArray.push(parameter);
return '?';
}

function concatParams(val1, val2) {
return `CONCAT(COALESCE(${val1}, ''), COALESCE(${val2}, ''))`;
function concatParams(firstParameter, secondParameter) {
return `CONCAT(COALESCE(${firstParameter}, ''), COALESCE(${secondParameter}, ''))`;
}

function upsert(ctx, task) {
return new Promise(function(resolve, reject) {
task.completeDefaults();
let dateNow = new Date();
let values = [];
let cbInsert = task.callback;
if (task.callback) {
let userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}
let p0 = addSqlParameter(task.tenant, values);
let p1 = addSqlParameter(task.key, values);
let p2 = addSqlParameter(task.status, values);
let p3 = addSqlParameter(task.statusInfo, values);
let p4 = addSqlParameter(dateNow, values);
let p5 = addSqlParameter(task.userIndex, values);
let p6 = addSqlParameter(task.changeId, values);
let p7 = addSqlParameter(cbInsert, values);
let p8 = addSqlParameter(task.baseurl, values);
let p9 = addSqlParameter(dateNow, values);
var sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl)`+
` VALUES (${p0}, ${p1}, ${p2}, ${p3}, ${p4}, ${p5}, ${p6}, ${p7}, ${p8}) ON DUPLICATE KEY UPDATE` +
` last_open_date = ${p9}`;
if (task.callback) {
let p10 = addSqlParameter(JSON.stringify(task.callback), values);
sqlCommand += `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${p10}, '}')`;
}
if (task.baseurl) {
let p11 = addSqlParameter(task.baseurl, values);
sqlCommand += `, baseurl = ${p11}`;
}

sqlCommand += ', user_index = LAST_INSERT_ID(user_index + 1);';

sqlQuery(ctx, sqlCommand, function(error, result) {
if (error) {
reject(error);
} else {
const insertId = result.affectedRows === 1 ? task.userIndex : result.insertId;
//if CLIENT_FOUND_ROWS don't specify 1 row is inserted , 2 row is updated, and 0 row is set to its current values
//http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
const isInsert = result.affectedRows === 1;

resolve({ isInsert, insertId });
}
}, true, false, values);
});
async function upsert(ctx, task) {
task.completeDefaults();
const dateNow = new Date();

let cbInsert = task.callback;
if (task.callback) {
const userCallback = new connectorUtilities.UserCallback();
userCallback.fromValues(task.userIndex, task.callback);
cbInsert = userCallback.toSQLInsert();
}

const values = [];
const valuesPlaceholder = [
addSqlParameter(task.tenant, values),
addSqlParameter(task.key, values),
addSqlParameter(task.status, values),
addSqlParameter(task.statusInfo, values),
addSqlParameter(dateNow, values),
addSqlParameter(task.userIndex, values),
addSqlParameter(task.changeId, values),
addSqlParameter(cbInsert, values),
addSqlParameter(task.baseurl, values)
];

let updateStatement = `last_open_date = ${addSqlParameter(dateNow, values)}`;
if (task.callback) {
let callbackPlaceholder = addSqlParameter(JSON.stringify(task.callback), values);
updateStatement += `, callback = CONCAT(callback , '${connectorUtilities.UserCallback.prototype.delimiter}{"userIndex":' , (user_index + 1) , ',"callback":', ${callbackPlaceholder}, '}')`;
}

if (task.baseurl) {
let baseUrlPlaceholder = addSqlParameter(task.baseurl, values);
updateStatement += `, baseurl = ${baseUrlPlaceholder}`;
}

updateStatement += ', user_index = LAST_INSERT_ID(user_index + 1);';

const sqlCommand = `INSERT INTO ${cfgTableResult} (tenant, id, status, status_info, last_open_date, user_index, change_id, callback, baseurl) `+
`VALUES (${valuesPlaceholder.join(', ')}) ` +
`ON DUPLICATE KEY UPDATE ${updateStatement}`;

const result = await executeQuery(ctx, sqlCommand, values, true);
const insertId = result.affectedRows === 1 ? task.userIndex : result.insertId;
//if CLIENT_FOUND_ROWS don't specify 1 row is inserted , 2 row is updated, and 0 row is set to its current values
//http://dev.mysql.com/doc/refman/5.7/en/insert-on-duplicate.html
const isInsert = result.affectedRows === 1;

return { isInsert, insertId };
}

module.exports = {
sqlQuery,
closePool,
addSqlParameter,
concatParams,
upsert
}
module.exports.sqlQuery = sqlQuery;
module.exports.closePool = closePool;
module.exports.addSqlParameter = addSqlParameter;
module.exports.concatParams = concatParams;
module.exports.upsert = upsert;

0 comments on commit fda1427

Please sign in to comment.