From 1a7f467c09555b772a2a63f9e0733aab2f4076ff Mon Sep 17 00:00:00 2001 From: Alex Hua Date: Sat, 15 Feb 2025 18:14:32 +0800 Subject: [PATCH] refactor: Refine the aria2.js close #215 --- background.js | 18 ++-- js/aria2.js | 251 +++++++++++++++++++++++++++++++------------------- 2 files changed, 165 insertions(+), 104 deletions(-) diff --git a/background.js b/background.js index 97b3aa0..5fc499d 100644 --- a/background.js +++ b/background.js @@ -10,10 +10,13 @@ const NID_TASK_NEW = "NID_TASK_NEW"; const NID_TASK_STOPPED = "NID_TASK_STOPPED"; const NID_CAPTURED_OTHERS = "NID_CAPTURED_OTHERS"; +const INTERVAL_SHORT = 1000; +const INTERVAL_LONG = 3000; + var CurrentWindowId = 0; var CurrentTabUrl = "about:blank"; var MonitorId = -1; -var MonitorInterval = 3000; // Aria2 monitor interval 3000ms +var MonitorInterval = INTERVAL_LONG; // Aria2 monitor interval 3000ms var RemoteAria2List = []; var IconAnimController = new AnimationController(); @@ -645,6 +648,7 @@ function enableMonitor() { } function disableMonitor() { + monitorAria2(); clearInterval(MonitorId); MonitorId = -1; chrome.action.setBadgeText({ text: "" }); @@ -664,6 +668,8 @@ async function monitorAria2() { for (const i in RemoteAria2List) { const remoteAria2 = RemoteAria2List[i]; try { + if (!remoteAria2.socket) remoteAria2.openSocket(); + let response = await remoteAria2.getGlobalStat(); if (response && response.error) { throw response.error; @@ -679,8 +685,6 @@ async function monitorAria2() { chrome.downloads.onDeterminingFilename.addListener(captureDownload); } - if (!remoteAria2.socket) remoteAria2.openSocket(); - // Only for default aria2, needs Aria2 enhanced version const percent = response.result.percentActive; if (i == 0 && percent && !isNaN(percent)) { @@ -705,8 +709,8 @@ async function monitorAria2() { } if (active > 0) { - if (MonitorInterval == 3000) { - MonitorInterval = 1000; + if (MonitorInterval == INTERVAL_LONG) { + MonitorInterval = INTERVAL_SHORT; disableMonitor(); enableMonitor(); } @@ -715,8 +719,8 @@ async function monitorAria2() { else chrome.power.releaseKeepAwake(); } else if (active == 0) { - if (MonitorInterval == 1000) { - MonitorInterval = 3000; + if (MonitorInterval == INTERVAL_SHORT) { + MonitorInterval = INTERVAL_LONG; disableMonitor(); enableMonitor(); } diff --git a/js/aria2.js b/js/aria2.js index 2baba7a..a987484 100644 --- a/js/aria2.js +++ b/js/aria2.js @@ -1,11 +1,16 @@ import Utils from "./utils.js"; const DEBUG = false; +const SOCKET_TIMEOUT = 30 * 1000; const DEFAULT_ARIA2 = { name: "Aria2", rpcUrl: "http://localhost:6800/jsonrpc", secretKey: '' }; class Aria2 { static RequestId = 0; + #isLocalhost; + #messageHandlers; + #pendingRequests; + /** * @param {object} Aria2 * @param {string} Aria2.name @@ -14,8 +19,10 @@ class Aria2 { */ constructor(aria2 = DEFAULT_ARIA2) { Object.assign(this, aria2); - this._isLocalhost = Utils.isLocalhost(this.rpcUrl); - this._messageHandlers = new Set(); + this.#isLocalhost = Utils.isLocalhost(this.rpcUrl); + this.#messageHandlers = new Set(); + this.#pendingRequests = new Map(); // Used to manage pending RPC requests + this.socket = null; } get sid() { @@ -23,47 +30,65 @@ class Aria2 { } get isLocalhost() { - return this._isLocalhost; + return this.#isLocalhost; } /** - * Change the remote rpc url and secret key. This will - * cause existing websocket to be closed. - * - * @param {string} rpcUrl Aria2 rpc url - * @param {string} secretKey Aria2 rpc secret key + * Change the remote RPC URL and secretKey, closing any existing WebSocket connection. + * @param {string} rpcUrl Aria2 RPC URL + * @param {string} secretKey Aria2 RPC secretKey */ setRemote(name = "Aria2", rpcUrl, secretKey = '') { - if (rpcUrl == this.rpcUrl && secretKey == this.secretKey) + if (rpcUrl === this.rpcUrl && secretKey === this.secretKey) return this; - if (!Utils.validateRpcUrl(rpcUrl)) + if (Utils.validateRpcUrl(rpcUrl) !== 'VALID') throw new Error("Invalid RPC URL!"); - if (this.socket) { - this.socket.close(); - this.socket = null; - } - this._isLocalhost = Utils.isLocalhost(rpcUrl); - return Object.assign(this, { name, rpcUrl, secretKey }); + this.closeSocket(); + this.#isLocalhost = Utils.isLocalhost(rpcUrl); + Object.assign(this, { name, rpcUrl, secretKey }); + return this; } + /** + * Open the WebSocket connection and register global event listeners. + */ openSocket() { let url = this.rpcUrl; if (url.startsWith("http")) - url = url.replace("http", "ws"); - - if (this.socket && this.socket.url == url && this.socket.readyState <= 1) + url = url.replace(/^http/, "ws"); + if (this.socket && this.socket.url === url && this.socket.readyState <= WebSocket.OPEN) return this.socket; try { this.socket = new WebSocket(url); + + // Register a global message event listener to handle all RPC responses and subscription messages. this.socket.addEventListener('message', this.#onMessage.bind(this)); + + // Error event: notify all pending requests about the error. + this.socket.addEventListener('error', (event) => { + this.#pendingRequests.forEach(({ reject }) => { + reject(new Error("WebSocket encountered an error")); + }); + this.#pendingRequests.clear(); + }); + + // Close event: also notify all pending requests. + this.socket.addEventListener('close', (event) => { + this.#pendingRequests.forEach(({ reject }) => { + reject(new Error("WebSocket closed")); + }); + this.#pendingRequests.clear(); + }); } catch (error) { - this.socket = null; console.error(error.message); } - return this.socket + return this.socket; } + /** + * Close the WebSocket connection. + */ closeSocket() { if (this.socket) { this.socket.close(); @@ -71,139 +96,171 @@ class Aria2 { } } + /** + * Global message handler that dispatches messages to the corresponding pending requests + * and to all registered external message handlers. + * @param {MessageEvent} event + */ #onMessage(event) { + let data; try { - let data = JSON.parse(event.data); + data = JSON.parse(event.data); data.source = this; - this._messageHandlers.forEach((handle) => { - handle(data); - }) } catch (error) { - console.error(`${this.name} get an invalid message`); + console.error(`${this.name} received invalid message: ${event.data}`); + return; + } + // If the message includes an id, check if it corresponds to a pending request. + if (data.id !== undefined && this.#pendingRequests.has(data.id)) { + const { resolve } = this.#pendingRequests.get(data.id); + resolve(data); + this.#pendingRequests.delete(data.id); } + // Dispatch the message to all registered message handlers. + this.#messageHandlers.forEach((handler) => { + try { + handler(data); + } catch (error) { + console.error("Message handler error:", error); + } + }); } + /** + * Register a message handler. + * @param {function} messageHandler + */ regMessageHandler(messageHandler) { - if (typeof messageHandler != 'function') { + if (typeof messageHandler !== 'function') { throw new Error("Invalid aria2 message handler"); } - this._messageHandlers.add(messageHandler); + this.#messageHandlers.add(messageHandler); } + /** + * Unregister a message handler. + * @param {function} messageHandler + */ unRegMessageHandler(messageHandler) { - if (typeof messageHandler != 'function') { + if (typeof messageHandler !== 'function') { throw new Error("Invalid aria2 message handler"); } - this._messageHandlers.delete(messageHandler); + this.#messageHandlers.delete(messageHandler); } + /** + * Internal method to send an RPC request (supports both WebSocket and HTTP fetch). + * @param {object} request Request object containing id, url, and payload. + * @returns {Promise} + */ async #doRPC(request) { - let url = request.url; - let socket = this.socket; - return new Promise((resolve, reject) => { - /* via websocket */ - if (socket) { - switch (socket.readyState) { - case 0: - socket.onopen = (event) => { - socket.send(request.payload); - }; - break; - case 1: - socket.send(request.payload); - break; - case 2: - case 3: - this.socket = null; - let error = new Error("Aria2 is unreachable"); - reject(error); - break; - default: - error = new Error("Unknown socket state:", socket.readyState); - reject(error); + // Prefer using WebSocket if it is open. + if (this.socket && this.socket.readyState === WebSocket.OPEN) { + return new Promise((resolve, reject) => { + // Add the request to the pending queue. + this.#pendingRequests.set(request.id, { resolve, reject }); + try { + this.socket.send(request.payload); + } catch (error) { + this.#pendingRequests.delete(request.id); + return reject(error); } - socket.onmessage = (event) => { - let response = JSON.parse(event.data); - if (response.id == request.id) - resolve(response); - }; - socket.onclose = (event) => { - this.socket = null; - reject(Error("Websocket is closed")); - }; - socket.onerror = (event) => { - this.socket = null; - reject(Error("Aria2 is unreachable")); - }; - } else { /* via http fetch */ - if (url.startsWith("ws")) - url = url.replace("ws", "http"); - - fetch(url, { + // Set a timeout (e.g., 30 seconds) for the RPC request. + setTimeout(() => { + if (this.#pendingRequests.has(request.id)) { + this.#pendingRequests.delete(request.id); + reject(new Error("RPC request timeout")); + } + }, SOCKET_TIMEOUT); + }); + } else { // Use HTTP fetch as a fallback. + let url = request.url; + if (url.startsWith("ws")) + url = url.replace(/^ws/, "http"); + try { + const response = await fetch(url, { method: "POST", body: request.payload, headers: { "Accept": "application/json", "Content-Type": "application/json" } - }).then(response => { - resolve(response.json()); - }).catch(error => { - reject(error); }); + return await response.json(); + } catch (error) { + this.closeSocket(); // Fetch error means aria2 is offline, so close socket. + return Promise.reject(error); } - }); + } } /** - * Build rpc request - * @param method {string} Aria2 rpc method - * @param params {array} Rpc params array + * Build the RPC request payload. + * @param {string} method Aria2 RPC method. + * @param {...any} params Parameter list. + * @returns {object} Request object containing id, url, and payload. */ #buildRequest(method, ...params) { - let id = this.sid; - let request = { id, url: this.rpcUrl, payload: '' }; + const id = this.sid; + const request = { id, url: this.rpcUrl, payload: '' }; if (this.secretKey) { params.unshift("token:" + this.secretKey); } request.payload = JSON.stringify({ - "jsonrpc": "2.0", - "method": method, - "id": id, - "params": params + jsonrpc: "2.0", + method: method, + id: id, + params: params }); return request; } + /** + * Add a download task. + * @param {string|string[]} uris Download URL or an array of URLs. + * @param {object} options Options for the download task. + */ addUri(uris, options) { - if (!Array.isArray(uris)) - uris = [uris]; - let request = this.#buildRequest("aria2.addUri", uris, options); + if (!Array.isArray(uris)) uris = [uris]; + const request = this.#buildRequest("aria2.addUri", uris, options); return this.#doRPC(request); } + /** + * Get the global status. + */ getGlobalStat() { - let request = this.#buildRequest("aria2.getGlobalStat"); + const request = this.#buildRequest("aria2.getGlobalStat"); return this.#doRPC(request); } + /** + * Get the file list for a task. + * @param {string} gid Task gid. + */ getFiles(gid) { - let request = this.#buildRequest("aria2.getFiles", gid); + const request = this.#buildRequest("aria2.getFiles", gid); return this.#doRPC(request); } + /** + * Get the status of a task. + * @param {string} gid Task gid. + * @param {string[]} keys Status keys (optional). + */ tellStatus(gid, keys) { - let request = null; - if (Array.isArray(keys) && keys.length > 0) { - request = this.#buildRequest("aria2.tellStatus", gid, keys); - } else { - request = this.#buildRequest("aria2.tellStatus", gid); - } + const request = (Array.isArray(keys) && keys.length > 0) + ? this.#buildRequest("aria2.tellStatus", gid, keys) + : this.#buildRequest("aria2.tellStatus", gid); return this.#doRPC(request); } + /** + * Change the global options. + * @param {object} options Global options. + */ setGlobalOptions(options) { - let request = this.#buildRequest("aria2.changeGlobalOption", options); + const request = this.#buildRequest("aria2.changeGlobalOption", options); return this.#doRPC(request); } }