Skip to content

Commit b9995db

Browse files
committed
fix: improve Databricks connection handling with retry logic
1 parent 9e29cea commit b9995db

File tree

2 files changed

+118
-32
lines changed

2 files changed

+118
-32
lines changed

.changeset/plain-heads-ring.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"ansible-database-mcp": patch
3+
---
4+
5+
improve Databricks connection handling with retry logic

src/services/db-connection/adapters/databricks-adapter.ts

Lines changed: 113 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export class DatabricksAdapter extends EventEmitter {
3333
private pendingRequests: Array<(session: IDBSQLSession) => void>;
3434
private isDestroyed: boolean;
3535
private poolSize: { min: number; max: number };
36+
private readonly MAX_CONNECTION_RETRIES = 3;
3637

3738
constructor(config: DatabaseConfig) {
3839
super();
@@ -52,7 +53,6 @@ export class DatabricksAdapter extends EventEmitter {
5253
max: config.pool?.max || 5
5354
};
5455

55-
const connection = config.connection;
5656
this.client = new DBSQLClient();
5757
}
5858

@@ -120,6 +120,59 @@ export class DatabricksAdapter extends EventEmitter {
120120
}
121121
}
122122

123+
/**
124+
* Check if error is a connection-related error
125+
*/
126+
private isConnectionError(error: any): boolean {
127+
if (!error) return false;
128+
129+
const errorMessage = error.message || '';
130+
const statusCode = error.statusCode;
131+
132+
// Only check for explicit indicators
133+
return (
134+
statusCode === 400 &&
135+
errorMessage.includes('THTTPException')
136+
);
137+
}
138+
139+
/**
140+
* Recreate client connection
141+
*/
142+
private async recreateConnection(): Promise<void> {
143+
console.log('[Databricks] Recreating client connection...');
144+
145+
// Close existing client
146+
try {
147+
await this.client.close();
148+
} catch (error) {
149+
console.error('[Databricks] Error closing client during recreation:', error);
150+
}
151+
152+
// Clear all sessions
153+
this.sessions = [];
154+
this.availableSessions = [];
155+
this.activeSessions.clear();
156+
157+
// Create new client and connect
158+
this.client = new DBSQLClient();
159+
const connection = this.config.connection as any;
160+
161+
await this.client.connect({
162+
host: connection.host,
163+
port: connection.port || 443,
164+
path: connection.path,
165+
token: connection.token
166+
});
167+
168+
// Create minimum number of sessions
169+
for (let i = 0; i < this.poolSize.min; i++) {
170+
await this.createSession();
171+
}
172+
173+
console.log('[Databricks] Client connection recreated successfully');
174+
}
175+
123176
/**
124177
* Acquire a session from the pool
125178
*/
@@ -180,43 +233,71 @@ export class DatabricksAdapter extends EventEmitter {
180233
* Execute a raw SQL query
181234
*/
182235
async raw(sql: string, bindings?: any[]): Promise<DatabricksQueryResult> {
183-
const session = await this.acquireSession();
236+
let lastError: any;
237+
let retryCount = 0;
184238

185-
try {
186-
const operation = await session.executeStatement(sql, {
187-
runAsync: false,
188-
maxRows: 10000 // Configurable limit
189-
});
190-
191-
const result = await operation.fetchAll();
192-
193-
let fields: any[] | undefined;
194-
if (operation.getSchema) {
239+
while (retryCount <= this.MAX_CONNECTION_RETRIES) {
240+
try {
241+
const session = await this.acquireSession();
242+
195243
try {
196-
const schema = await operation.getSchema();
197-
// Convert TTableSchema to array format if needed
198-
if (schema && typeof schema === 'object' && 'columns' in schema) {
199-
fields = (schema as any).columns || [];
200-
} else {
201-
fields = undefined;
244+
const operation = await session.executeStatement(sql, {
245+
runAsync: false,
246+
maxRows: 10000 // Configurable limit
247+
});
248+
249+
const result = await operation.fetchAll();
250+
251+
let fields: any[] | undefined;
252+
if (operation.getSchema) {
253+
try {
254+
const schema = await operation.getSchema();
255+
// Convert TTableSchema to array format if needed
256+
if (schema && typeof schema === 'object' && 'columns' in schema) {
257+
fields = (schema as any).columns || [];
258+
} else {
259+
fields = undefined;
260+
}
261+
} catch {
262+
fields = undefined;
263+
}
264+
}
265+
266+
await operation.close();
267+
268+
return {
269+
rows: result,
270+
fields
271+
};
272+
} catch (error: any) {
273+
console.error('[Databricks] Query execution error:', error);
274+
throw error;
275+
} finally {
276+
this.releaseSession(session);
277+
}
278+
} catch (error: any) {
279+
lastError = error;
280+
281+
if (this.isConnectionError(error) && retryCount < this.MAX_CONNECTION_RETRIES) {
282+
console.log(`[Databricks] Connection error detected, attempting reconnection (${retryCount + 1}/${this.MAX_CONNECTION_RETRIES})`);
283+
284+
try {
285+
await this.recreateConnection();
286+
retryCount++;
287+
continue;
288+
} catch (reconnectError) {
289+
console.error('[Databricks] Failed to recreate connection:', reconnectError);
290+
throw reconnectError;
202291
}
203-
} catch {
204-
fields = undefined;
292+
} else {
293+
// Not a connection error or max retries reached
294+
throw error;
205295
}
206296
}
207-
208-
await operation.close();
209-
210-
return {
211-
rows: result,
212-
fields
213-
};
214-
} catch (error: any) {
215-
console.error('[Databricks] Query execution error:', error);
216-
throw error;
217-
} finally {
218-
this.releaseSession(session);
219297
}
298+
299+
// If we get here, all retries failed
300+
throw lastError;
220301
}
221302

222303
/**

0 commit comments

Comments
 (0)