Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ function Connection(context) {
url: Url.format({
pathname: `/monitoring/queries/${queryId}`,
}),
useExperimentalRetryMiddleware: true,
};

Logger.getInstance().debug(
Expand Down
7 changes: 6 additions & 1 deletion lib/connection/statement.js
Original file line number Diff line number Diff line change
Expand Up @@ -1271,6 +1271,7 @@ function sendCancelStatement(statementContext, statement, callback) {
method: 'POST',
url: url,
json: json,
useExperimentalRetryMiddleware: true,
callback: function (err) {
// if a callback was specified, invoke it
if (Util.isFunction(callback)) {
Expand Down Expand Up @@ -1547,7 +1548,11 @@ function sendSfRequest(statementContext, options, appendQueryParamOnRetry) {
options.url = Util.url.appendRetryParam(retryOption);
}

sf.request(options);
sf.request({
...options,
// TODO: debugging
useExperimentalRetryMiddleware: true,
});
};

// replace the specified callback with a new one that retries
Expand Down
85 changes: 85 additions & 0 deletions lib/http/axiosInstance.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import axiosLib, { AxiosError, InternalAxiosRequestConfig } from 'axios';
import * as Util from '../util';
import Logger from '../logger';
import * as requestUtil from './request_util';

type SnowflakeAxiosRequestConfig = InternalAxiosRequestConfig & {
useExperimentalRetryMiddleware?: boolean;
__snowflakeRetryConfig?: {
numRetries: number;
totalElapsedTime: number;
startingSleepTime: number;
maxNumRetries: number;
maxRetryTimeout: number;
};
};

const axios = axiosLib.create();

/*
* NOTE:
* This interceptor enables request retries when useExperimentalRetryMiddleware=true.
*
* It's marked as experimental, because it doesn't handle retry customization ATM and is intended
* to be used with endpoints that have no other retry handling.
*
* Future improvements:
* - Handle retry customization (similar to axios-retry library)
* - Support retry telemetry (retryCount and retryReason query params)
* - Consider replacing code with axios-retry
* - Support abort signal
*/
axios.interceptors.response.use(
(response) => response,
async (err: AxiosError) => {
const config = err.config ? (err.config as SnowflakeAxiosRequestConfig) : null;
if (!config || !config.useExperimentalRetryMiddleware) {
return Promise.reject(err);
}

config.__snowflakeRetryConfig ??= {
numRetries: 1,
totalElapsedTime: 0,
startingSleepTime: 1,
maxNumRetries: 7,
maxRetryTimeout: 300,
};
const { numRetries, totalElapsedTime, startingSleepTime, maxNumRetries, maxRetryTimeout } =
config.__snowflakeRetryConfig;

// TODO:
// - ensure test coverage for isRetryableNetworkError and isRetryableHttpError
// - check if we handle redirects
// - probably need to retry timeouts + test coverage
const isRetryable = err.response
? Util.isRetryableHttpError({ statusCode: err.response.status }, false)
: true; // TODO: ['ERR_CANCELED', 'ECONNABORTED']; this 2 should be ignored

if (isRetryable && numRetries <= maxNumRetries && totalElapsedTime <= maxRetryTimeout) {
// TODO:
// Util.nextSleepTime mighe be better?
const jitter = Util.getJitteredSleepTime(
numRetries,
startingSleepTime,
totalElapsedTime,
maxRetryTimeout,
);
config.__snowflakeRetryConfig.totalElapsedTime = jitter.totalElapsedTime;
config.__snowflakeRetryConfig.numRetries++;

Logger().debug(
'useExperimentalRetryMiddleware: Retrying request%s - error=%s, attempt=%s, delay=%ss',
requestUtil.describeRequestFromOptions(config),
err.message,
numRetries,
jitter.sleep,
);
await Util.sleep(jitter.sleep * 1000);
return axios.request(config);
} else {
return Promise.reject(err);
}
},
);

export default axios;
6 changes: 5 additions & 1 deletion lib/http/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ const zlib = require('zlib');
const Util = require('../util');
const Logger = require('../logger');
const ExecutionTimer = require('../logger/execution_timer');
const axios = require('axios');
const URL = require('node:url').URL;
const requestUtil = require('./request_util');
const axios = require('./axiosInstance').default;

const DEFAULT_REQUEST_TIMEOUT = 360000;

Expand Down Expand Up @@ -358,6 +358,9 @@ function prepareRequestOptions(options, requestHandlers = {}) {
agentClass: this._connectionConfig.agentClass,
};
}

// NOTE:
// backoffStrategy is a dead code that is not used in actual retry logic
const backoffStrategy = this.constructExponentialBackoffStrategy();
const requestOptions = {
method: options.method,
Expand All @@ -372,6 +375,7 @@ function prepareRequestOptions(options, requestHandlers = {}) {
// we manually parse jsons or other structures from the server so they need to be text
responseType: options.responseType || 'text',
proxy: false,
useExperimentalRetryMiddleware: options.useExperimentalRetryMiddleware,
...requestHandlers,
};

Expand Down
2 changes: 2 additions & 0 deletions lib/services/sf.js
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ function StateAbstract(options) {
gzip: requestOptions.gzip,
json: requestOptions.json,
params: params,
useExperimentalRetryMiddleware: requestOptions.useExperimentalRetryMiddleware,
callback: async function (err, response, body) {
// if we got an error, wrap it into a network error
if (err) {
Expand Down Expand Up @@ -1418,6 +1419,7 @@ StateConnected.prototype.destroy = function (options) {
method: 'POST',
url: `/session?delete=true&requestId=${requestID}`,
scope: this,
useExperimentalRetryMiddleware: true,
callback: function (err) {
// if the destroy request succeeded or the session already expired, we're disconnected
if (
Expand Down
4 changes: 4 additions & 0 deletions lib/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,10 @@ export function escapeHTML(value: string) {
.replace(/'/g, '&#39;');
}

export function sleep(sleepTimeMs: number) {
return new Promise((resolve) => setTimeout(resolve, sleepTimeMs));
}

/**
* Typescript with "module": "commonjs" will transform every import() to a require() statement.
*
Expand Down
3 changes: 3 additions & 0 deletions test/integration/testCancel.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
const async = require('async');
const testUtil = require('./testUtil');

const snowflake = require('./../../lib/snowflake');
snowflake.configure({ logLevel: 'trace' });

describe('Test Cancel Query', function () {
let connection;
const longQuery = 'select count(*) from table(generator(timeLimit => 3600))';
Expand Down
34 changes: 19 additions & 15 deletions test/integration/testHTAP.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
const assert = require('assert');
const async = require('async');
const uuid = require('uuid');
const connOption = require('./connectionOptions').valid;
const testUtil = require('./testUtil');

function getRandomDBNames() {
const dbName = 'qcc_test_db';
// TODO: remove me
const snowflake = require('./../../lib/snowflake');
snowflake.configure({ logLevel: 'trace' });

function getUniqueDBNames(amount = 3) {
const arr = [];
const randomNumber = Math.floor(Math.random() * 10000);
for (let i = 0; i < 3; i++) {
arr.push(dbName + (randomNumber + i));
for (let i = 0; i < amount; i++) {
arr.push(`qcc_test_db_${Date.now()}_${uuid.v4().replaceAll('-', '_')}`);
}
return arr;
}

// Only the AWS servers support the hybrid table in the GitHub action.
if (process.env.CLOUD_PROVIDER === 'AWS') {
describe('Query Context Cache test', function () {
this.retries(3);
this.timeout(5 * 60 * 1000);

let connection;
const dbNames = getRandomDBNames();
const dbNames = getUniqueDBNames();

beforeEach(async () => {
connection = testUtil.createConnection(connOption);
Expand Down Expand Up @@ -75,21 +79,21 @@ if (process.env.CLOUD_PROVIDER === 'AWS') {
testingfunction = function (callback) {
connection.execute({
sqlText: sqlTexts[k],
complete: function (err) {
assert.ok(!err, 'There should be no error!');
callback();
},
complete: callback,
});
};
} else {
testingfunction = function (callback) {
connection.execute({
sqlText: sqlTexts[k],
complete: function (err, stmt) {
assert.ok(!err, 'There should be no error!');
assert.strictEqual(stmt.getQueryContextCacheSize(), QccSize);
assert.strictEqual(stmt.getQueryContextDTOSize(), QccSize);
callback();
if (err) {
callback(err);
} else {
assert.strictEqual(stmt.getQueryContextCacheSize(), QccSize);
assert.strictEqual(stmt.getQueryContextDTOSize(), QccSize);
callback();
}
},
});
};
Expand Down
2 changes: 1 addition & 1 deletion test/integration/testLargeResultSet.js
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ describe('SNOW-743920:Large result set with ~35 chunks', function () {
} else {
stmt
.streamRows()
.on('error', () => done(err))
.on('error', (streamErr) => done(streamErr))
.on('data', (row) => rows.push(row))
.on('end', () => {
try {
Expand Down
30 changes: 8 additions & 22 deletions test/integration/testOcsp.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
const Os = require('os');
const async = require('async');
const assert = require('assert');
const { exec } = require('child_process');
const snowflake = require('./../../lib/snowflake');
const connOption = require('./connectionOptions');
const SocketUtil = require('./../../lib/agent/socket_util');
const OcspResponseCache = require('./../../lib/agent/ocsp_response_cache');
const Check = require('./../../lib/agent/check');
const Util = require('./../../lib/util');
const { exec } = require('child_process');
const testUtil = require('./testUtil');

const sharedLogger = require('./sharedLogger');
const Logger = require('./../../lib/logger');
Logger.getInstance().setLogger(sharedLogger.logger);
const testUtil = require('./testUtil');

describe('OCSP validation', function () {
it('OCSP validation with server reusing SSL sessions', function (done) {
Expand All @@ -25,13 +22,9 @@ describe('OCSP validation', function () {
async.series(
[
function (callback) {
connection.connect(function (err) {
assert.ok(!err, JSON.stringify(err));
callback();
});
connection.connect(callback);
},
function (callback) {
let numErrors = 0;
let numStmtsExecuted = 0;
const numStmtsTotal = 20;

Expand All @@ -42,12 +35,10 @@ describe('OCSP validation', function () {
sqlText: 'select 1;',
complete: function (err) {
if (err) {
numErrors++;
callback(err);
}

numStmtsExecuted++;
if (numStmtsExecuted === numStmtsTotal - 1) {
assert.strictEqual(numErrors, 0);
if (numStmtsExecuted === numStmtsTotal) {
callback();
}
},
Expand All @@ -71,13 +62,9 @@ describe('OCSP validation', function () {
async.series(
[
function (callback) {
connection.connect(function (err) {
assert.ok(!err, JSON.stringify(err));
callback();
});
connection.connect(callback);
},
function (callback) {
let numErrors = 0;
let numStmtsExecuted = 0;
const numStmtsTotal = 5;

Expand All @@ -89,13 +76,12 @@ describe('OCSP validation', function () {
sqlText: 'select 1;',
complete: function (err) {
if (err) {
numErrors++;
callback(err);
}

numStmtsExecuted++;
if (numStmtsExecuted === numStmtsTotal - 1) {
if (numStmtsExecuted === numStmtsTotal) {
delete process.env['SF_OCSP_TEST_CACHE_MAXAGE'];
assert.strictEqual(numErrors, 0);
callback();
}
},
Expand Down
Loading
Loading