diff --git a/.env.example b/.env.example index c19992b7e2..1347c002f4 100644 --- a/.env.example +++ b/.env.example @@ -23,6 +23,10 @@ MESH_DATA_UPDATE_INTERVAL_MS=1000 # Default: 1000 MESH_EVENT_BATCH_INTERVAL_MS=1000 +# Periodic data sync interval in milliseconds +# Default: 15000 (15 seconds) +MESH_PERIODIC_DATA_SYNC_INTERVAL_MS=15000 + # Debug logging (comma-separated categories, e.g., scratch-vm:*) DEBUG= diff --git a/src/extensions/scratch3_mesh_v2/mesh-service.js b/src/extensions/scratch3_mesh_v2/mesh-service.js index b7295043ad..fec9e7a8ad 100644 --- a/src/extensions/scratch3_mesh_v2/mesh-service.js +++ b/src/extensions/scratch3_mesh_v2/mesh-service.js @@ -81,7 +81,8 @@ class MeshV2Service { 100, // min: 100ms 10000 // max: 10 seconds ); - this.dataRateLimiter = new RateLimiter(4, dataInterval, { + log.info(`Mesh V2: Data update interval set to ${dataInterval}ms`); + this.dataRateLimiter = new RateLimiter(dataInterval, { enableMerge: true, mergeKeyField: 'key' }); @@ -95,8 +96,18 @@ class MeshV2Service { 100, // min: 100ms 10000 // max: 10 seconds ); + log.info(`Mesh V2: Event batch interval set to ${this.eventBatchInterval}ms`); this.eventBatchTimer = null; + // Periodic data sync interval (default: 15000ms) + this.periodicDataSyncInterval = parseEnvInt( + process.env.MESH_PERIODIC_DATA_SYNC_INTERVAL_MS, + 15000, // default + 1000, // min: 1 second + 3600000 // max: 1 hour + ); + log.info(`Mesh V2: Periodic data sync interval set to ${this.periodicDataSyncInterval}ms`); + // Event queue limits this.MAX_EVENT_QUEUE_SIZE = 100; // 最大100イベント this.eventQueueStats = { @@ -105,8 +116,10 @@ class MeshV2Service { lastReportTime: Date.now() }; - // Last sent data to detect changes + // Last sent data to detect changes (confirmed by server) this.lastSentData = {}; + // Latest data queued for sending (may not be confirmed yet) + this.latestQueuedData = {}; // イベントキュー: {event, offsetMs} の配列 this.pendingBroadcasts = []; @@ -423,6 +436,7 @@ class MeshV2Service { this.isHost = false; this.remoteData = {}; this.lastSentData = {}; + this.latestQueuedData = {}; } startSubscriptions () { @@ -624,6 +638,7 @@ class MeshV2Service { startEventBatchTimer () { this.stopEventBatchTimer(); + log.debug(`Mesh V2: Starting event batch timer (Interval: ${this.eventBatchInterval}ms)`); this.eventBatchTimer = setInterval(() => { this.processBatchEvents(); }, this.eventBatchInterval); @@ -687,7 +702,8 @@ class MeshV2Service { this.stopHeartbeat(); if (!this.groupId) return; - log.info(`Mesh V2: Starting heartbeat timer (Role: ${this.isHost ? 'Host' : 'Member'})`); + log.info(`Mesh V2: Starting heartbeat timer (Role: ${this.isHost ? 'Host' : 'Member'}, ` + + `Interval: ${this.isHost ? this.hostHeartbeatInterval : this.memberHeartbeatInterval}s)`); const interval = (this.isHost ? this.hostHeartbeatInterval : this.memberHeartbeatInterval) * 1000; this.heartbeatTimer = setInterval(() => { @@ -802,34 +818,28 @@ class MeshV2Service { } } - /** - * Check if the data has changed since the last successful send. - * @param {Array} dataArray - Array of {key, value} objects. - * @returns {boolean} - True if data is unchanged. - */ - isDataUnchanged (dataArray) { - if (dataArray.length !== Object.keys(this.lastSentData).length) return false; - for (const item of dataArray) { - if (this.lastSentData[item.key] !== item.value) return false; - } - return true; - } - async sendData (dataArray) { if (!this.groupId || !this.client) return; - const unchanged = this.isDataUnchanged(dataArray); - log.debug(`Mesh V2: sendData called with ${dataArray.length} items: ` + - `${JSON.stringify(dataArray)} (unchanged: ${unchanged})`); + // Delta transmission: Filter out items that haven't changed since they were LAST QUEUED. + // This avoids redundant mutations if values change back within the rate-limit interval. + const filteredData = dataArray.filter(item => this.latestQueuedData[item.key] !== item.value); + + log.debug(`Mesh V2: sendData called with ${dataArray.length} items, ` + + `${filteredData.length} items changed: ${JSON.stringify(filteredData)}`); - // Change detection - if (unchanged) { + if (filteredData.length === 0) { return; } + // Update latestQueuedData IMMEDIATELY before sending to RateLimiter + filteredData.forEach(item => { + this.latestQueuedData[item.key] = item.value; + }); + try { // Save Promise to track completion (including queue time) - this.lastDataSendPromise = this.dataRateLimiter.send(dataArray, this._reportDataBound); + this.lastDataSendPromise = this.dataRateLimiter.send(filteredData, this._reportDataBound); await this.lastDataSendPromise; } catch (error) { log.error(`Mesh V2: Failed to send data: ${error}`); @@ -850,6 +860,23 @@ class MeshV2Service { async _reportData (payload) { if (!this.groupId || !this.client) return; + // Final delta check: Filter out items that haven't changed since the LAST SUCCESSFUL transmission. + // This handles cases where values changed back while an earlier mutation was in flight. + const finalPayload = payload.filter(item => this.lastSentData[item.key] !== item.value); + + if (finalPayload.length === 0) { + log.debug('Mesh V2: Skipping mutation as all data is already up-to-date on server'); + return { + data: { + reportDataByNode: { + nodeStatus: { + data: payload // Return original payload to satisfy caller expectation + } + } + } + }; + } + try { this.costTracking.mutationCount++; this.costTracking.reportDataCount++; @@ -861,14 +888,14 @@ class MeshV2Service { groupId: this.groupId, domain: this.domain, nodeId: this.meshId, - data: payload + data: finalPayload } }); const result = await this.lastDataSendPromise; // Update last sent data on success - payload.forEach(item => { + finalPayload.forEach(item => { this.lastSentData[item.key] = item.value; }); @@ -995,11 +1022,12 @@ class MeshV2Service { startPeriodicDataSync () { this.stopPeriodicDataSync(); - // Sync every 5 minutes + const interval = this.periodicDataSyncInterval; + log.info(`Mesh V2: Starting periodic data sync timer (Interval: ${interval / 1000}s)`); this.dataSyncTimer = setInterval(() => { log.info('Mesh V2: Periodic data sync'); this.fetchAllNodesData(); - }, 5 * 60 * 1000); + }, interval); } /** diff --git a/src/extensions/scratch3_mesh_v2/rate-limiter.js b/src/extensions/scratch3_mesh_v2/rate-limiter.js index 401be742ce..7223d2cc6e 100644 --- a/src/extensions/scratch3_mesh_v2/rate-limiter.js +++ b/src/extensions/scratch3_mesh_v2/rate-limiter.js @@ -3,14 +3,12 @@ const log = require('../../util/log'); /* istanbul ignore next */ class RateLimiter { /** - * @param {number} maxPerSecond - Maximum number of requests per second. * @param {number} intervalMs - Minimum interval between requests in milliseconds. * @param {object} options - Optional parameters. * @param {boolean} options.enableMerge - Whether to merge data in the queue (default: false). * @param {string} options.mergeKeyField - Field name to use as merge key (default: 'key'). */ - constructor (maxPerSecond, intervalMs, options = {}) { - this.maxPerSecond = maxPerSecond; + constructor (intervalMs, options = {}) { this.intervalMs = intervalMs; this.queue = []; this.lastSendTime = 0; diff --git a/test/unit/extension_mesh_v2_delta.js b/test/unit/extension_mesh_v2_delta.js new file mode 100644 index 0000000000..2c830092be --- /dev/null +++ b/test/unit/extension_mesh_v2_delta.js @@ -0,0 +1,108 @@ +const test = require('tap').test; +const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service'); + +const createMockBlocks = () => ({ + runtime: { + on: () => {}, + getTargetForStage: () => ({ + variables: {} + }), + sequencer: {} + }, + opcodeFunctions: { + event_broadcast: () => {} + } +}); + +test('MeshV2Service Delta Transmission', t => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + + // Mock client and rate limiter + let mutationCount = 0; + let reportedData = null; + + service.client = { + mutate: ({variables}) => { + mutationCount++; + reportedData = variables.data; + return Promise.resolve({ + data: { + reportDataByNode: { + nodeStatus: { + data: variables.data + } + } + } + }); + } + }; + service.groupId = 'g1'; + service.domain = 'd1'; + + // Set a very short interval for rate limiter to speed up tests + service.dataRateLimiter.intervalMs = 0; + + t.test('should send all data initially', async st => { + const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '2'}]; + await service.sendData(data); + + st.equal(mutationCount, 1, 'Should call mutation'); + st.same(reportedData, data, 'Should send all data'); + st.same(service.lastSentData, {v1: '1', v2: '2'}, 'Should update lastSentData'); + st.end(); + }); + + t.test('should skip sending if data is unchanged', async st => { + mutationCount = 0; + reportedData = null; + + const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '2'}]; + await service.sendData(data); + + st.equal(mutationCount, 0, 'Should NOT call mutation if data is unchanged'); + st.end(); + }); + + t.test('should only send changed items (delta)', async st => { + mutationCount = 0; + reportedData = null; + + // Only v2 changed + const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '3'}]; + await service.sendData(data); + + st.equal(mutationCount, 1, 'Should call mutation if some data changed'); + st.same(reportedData, [{key: 'v2', value: '3'}], 'Should ONLY send changed items'); + st.same(service.lastSentData, {v1: '1', v2: '3'}, 'Should update lastSentData for changed item'); + st.end(); + }); + + t.test('should send new items', async st => { + mutationCount = 0; + reportedData = null; + + const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '3'}, {key: 'v3', value: '4'}]; + await service.sendData(data); + + st.equal(mutationCount, 1); + st.same(reportedData, [{key: 'v3', value: '4'}], 'Should send new items'); + st.end(); + }); + + t.test('should handle single item sendData calls from index.js', async st => { + mutationCount = 0; + reportedData = null; + + // index.js often calls: sendData([{key: name, value: value}]) + await service.sendData([{key: 'v1', value: '1'}]); // Unchanged + st.equal(mutationCount, 0, 'Should skip unchanged single item'); + + await service.sendData([{key: 'v1', value: 'updated'}]); // Changed + st.equal(mutationCount, 1, 'Should send changed single item'); + st.same(reportedData, [{key: 'v1', value: 'updated'}]); + st.end(); + }); + + t.end(); +}); diff --git a/test/unit/extension_mesh_v2_delta_repro.js b/test/unit/extension_mesh_v2_delta_repro.js new file mode 100644 index 0000000000..3e2b93b503 --- /dev/null +++ b/test/unit/extension_mesh_v2_delta_repro.js @@ -0,0 +1,103 @@ +const test = require('tap').test; +const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service'); + +const createMockBlocks = () => ({ + runtime: { + on: () => {}, + getTargetForStage: () => ({ + variables: {} + }), + sequencer: {} + }, + opcodeFunctions: { + event_broadcast: () => {} + } +}); + +test('MeshV2Service Delta Transmission Redundancy Repro', t => { + const blocks = createMockBlocks(); + const service = new MeshV2Service(blocks, 'node1', 'domain1'); + + let mutationCount = 0; + let reportedPayloads = []; + + service.client = { + mutate: ({variables}) => { + mutationCount++; + reportedPayloads.push(JSON.parse(JSON.stringify(variables.data))); + // 送信に少し時間がかかることをシミュレート + return new Promise(resolve => { + setTimeout(() => { + resolve({ + data: { + reportDataByNode: { + nodeStatus: { + data: variables.data + } + } + } + }); + }, 50); + }); + } + }; + service.groupId = 'g1'; + service.domain = 'd1'; + + // インターバルを1000msに設定 + service.dataRateLimiter.intervalMs = 1000; + + t.test('should NOT send redundant data if value changes back before transmission', async st => { + // 1. データAを1にセット + const p1 = service.sendData([{key: 'A', value: '1'}]); + + // 2. 少し待って、データAを991にセット + await new Promise(resolve => setTimeout(resolve, 100)); + const p2 = service.sendData([{key: 'A', value: '991'}]); + + // 3. さらに少し待って、データAを1にセット + await new Promise(resolve => setTimeout(resolve, 100)); + const p3 = service.sendData([{key: 'A', value: '1'}]); + + // 全ての送信が完了するのを待つ + await Promise.all([p1, p2, p3]); + await service.dataRateLimiter.waitForCompletion(); + + st.equal(mutationCount, 1, 'Should only call mutation ONCE if the final state matches initial state'); + st.same(reportedPayloads[0], [{key: 'A', value: '1'}], 'The first mutation should be 1'); + + if (mutationCount > 1) { + st.fail(`Redundant mutation detected: ${JSON.stringify(reportedPayloads)}`); + } + + st.end(); + }); + + t.test('should send data if value changes and stays changed', async st => { + mutationCount = 0; + reportedPayloads = []; + service.lastSentData = {}; + service.latestQueuedData = {}; + + // 1. データAを1にセット + const p1 = service.sendData([{key: 'A', value: '1'}]); + + // 2. 少し待って、データAを991にセット + await new Promise(resolve => setTimeout(resolve, 100)); + const p2 = service.sendData([{key: 'A', value: '991'}]); + + // 3. さらに少し待って、データAを992にセット (1ではない) + await new Promise(resolve => setTimeout(resolve, 100)); + const p3 = service.sendData([{key: 'A', value: '992'}]); + + // 全ての送信が完了するのを待つ + await Promise.all([p1, p2, p3]); + await service.dataRateLimiter.waitForCompletion(); + + st.equal(mutationCount, 1, 'Should call mutation once (all changes merged into one)'); + st.same(reportedPayloads[0], [{key: 'A', value: '992'}], 'The mutation should contain the latest value'); + st.end(); + }); + + t.end(); +}); diff --git a/test/unit/rate_limiter.js b/test/unit/rate_limiter.js index b4f074ab42..12f1dbb7bb 100644 --- a/test/unit/rate_limiter.js +++ b/test/unit/rate_limiter.js @@ -2,7 +2,7 @@ const test = require('tap').test; const RateLimiter = require('../../src/extensions/scratch3_mesh_v2/rate-limiter'); test('RateLimiter Basic', t => { - const limiter = new RateLimiter(4, 10); + const limiter = new RateLimiter(10); let count = 0; const pendingResolves = []; @@ -36,7 +36,7 @@ test('RateLimiter Basic', t => { }); test('RateLimiter Merge Feature', t => { - const limiter = new RateLimiter(4, 10, { + const limiter = new RateLimiter(10, { enableMerge: true, mergeKeyField: 'key' }); @@ -82,7 +82,7 @@ test('RateLimiter Merge Feature', t => { test('RateLimiter Merge Different Keys in same Send', t => { - const limiter = new RateLimiter(4, 10, { + const limiter = new RateLimiter(10, { enableMerge: true, diff --git a/test/unit/scratch3_mesh_v2_rate_limiter_repro.js b/test/unit/scratch3_mesh_v2_rate_limiter_repro.js index 09d96cecc1..b9846b8e54 100644 --- a/test/unit/scratch3_mesh_v2_rate_limiter_repro.js +++ b/test/unit/scratch3_mesh_v2_rate_limiter_repro.js @@ -7,8 +7,8 @@ log.debug = () => {}; log.info = () => {}; test('RateLimiter stack overflow reproduction', {timeout: 60000}, async t => { - // maxPerSecond: 1, intervalMs: 250ms, enableMerge: true - const limiter = new RateLimiter(1, 250, {enableMerge: true}); + // intervalMs: 250ms, enableMerge: true + const limiter = new RateLimiter(250, {enableMerge: true}); // Immediate sendFunction const sendFunction = d => Promise.resolve(d);