Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ MESH_DATA_UPDATE_INTERVAL_MS=1000
# Default: 1000
MESH_EVENT_BATCH_INTERVAL_MS=1000

# Periodic data sync interval in milliseconds
# Default: 15000 (15 seconds)
MESH_PERIODIC_DATA_SYNC_INTERVAL_MS=15000

# Debug logging (comma-separated categories, e.g., scratch-vm:*)
DEBUG=

Expand Down
80 changes: 54 additions & 26 deletions src/extensions/scratch3_mesh_v2/mesh-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ class MeshV2Service {
100, // min: 100ms
10000 // max: 10 seconds
);
this.dataRateLimiter = new RateLimiter(4, dataInterval, {
log.info(`Mesh V2: Data update interval set to ${dataInterval}ms`);
this.dataRateLimiter = new RateLimiter(dataInterval, {
enableMerge: true,
mergeKeyField: 'key'
});
Expand All @@ -95,8 +96,18 @@ class MeshV2Service {
100, // min: 100ms
10000 // max: 10 seconds
);
log.info(`Mesh V2: Event batch interval set to ${this.eventBatchInterval}ms`);
this.eventBatchTimer = null;

// Periodic data sync interval (default: 15000ms)
this.periodicDataSyncInterval = parseEnvInt(
process.env.MESH_PERIODIC_DATA_SYNC_INTERVAL_MS,
15000, // default
1000, // min: 1 second
3600000 // max: 1 hour
);
log.info(`Mesh V2: Periodic data sync interval set to ${this.periodicDataSyncInterval}ms`);

// Event queue limits
this.MAX_EVENT_QUEUE_SIZE = 100; // 最大100イベント
this.eventQueueStats = {
Expand All @@ -105,8 +116,10 @@ class MeshV2Service {
lastReportTime: Date.now()
};

// Last sent data to detect changes
// Last sent data to detect changes (confirmed by server)
this.lastSentData = {};
// Latest data queued for sending (may not be confirmed yet)
this.latestQueuedData = {};

// イベントキュー: {event, offsetMs} の配列
this.pendingBroadcasts = [];
Expand Down Expand Up @@ -423,6 +436,7 @@ class MeshV2Service {
this.isHost = false;
this.remoteData = {};
this.lastSentData = {};
this.latestQueuedData = {};
}

startSubscriptions () {
Expand Down Expand Up @@ -624,6 +638,7 @@ class MeshV2Service {

startEventBatchTimer () {
this.stopEventBatchTimer();
log.debug(`Mesh V2: Starting event batch timer (Interval: ${this.eventBatchInterval}ms)`);
this.eventBatchTimer = setInterval(() => {
this.processBatchEvents();
}, this.eventBatchInterval);
Expand Down Expand Up @@ -687,7 +702,8 @@ class MeshV2Service {
this.stopHeartbeat();
if (!this.groupId) return;

log.info(`Mesh V2: Starting heartbeat timer (Role: ${this.isHost ? 'Host' : 'Member'})`);
log.info(`Mesh V2: Starting heartbeat timer (Role: ${this.isHost ? 'Host' : 'Member'}, ` +
`Interval: ${this.isHost ? this.hostHeartbeatInterval : this.memberHeartbeatInterval}s)`);
const interval = (this.isHost ? this.hostHeartbeatInterval : this.memberHeartbeatInterval) * 1000;

this.heartbeatTimer = setInterval(() => {
Expand Down Expand Up @@ -802,34 +818,28 @@ class MeshV2Service {
}
}

/**
* Check if the data has changed since the last successful send.
* @param {Array} dataArray - Array of {key, value} objects.
* @returns {boolean} - True if data is unchanged.
*/
isDataUnchanged (dataArray) {
if (dataArray.length !== Object.keys(this.lastSentData).length) return false;
for (const item of dataArray) {
if (this.lastSentData[item.key] !== item.value) return false;
}
return true;
}

async sendData (dataArray) {
if (!this.groupId || !this.client) return;

const unchanged = this.isDataUnchanged(dataArray);
log.debug(`Mesh V2: sendData called with ${dataArray.length} items: ` +
`${JSON.stringify(dataArray)} (unchanged: ${unchanged})`);
// Delta transmission: Filter out items that haven't changed since they were LAST QUEUED.
// This avoids redundant mutations if values change back within the rate-limit interval.
const filteredData = dataArray.filter(item => this.latestQueuedData[item.key] !== item.value);

log.debug(`Mesh V2: sendData called with ${dataArray.length} items, ` +
`${filteredData.length} items changed: ${JSON.stringify(filteredData)}`);

// Change detection
if (unchanged) {
if (filteredData.length === 0) {
return;
}

// Update latestQueuedData IMMEDIATELY before sending to RateLimiter
filteredData.forEach(item => {
this.latestQueuedData[item.key] = item.value;
});

try {
// Save Promise to track completion (including queue time)
this.lastDataSendPromise = this.dataRateLimiter.send(dataArray, this._reportDataBound);
this.lastDataSendPromise = this.dataRateLimiter.send(filteredData, this._reportDataBound);
await this.lastDataSendPromise;
} catch (error) {
log.error(`Mesh V2: Failed to send data: ${error}`);
Expand All @@ -850,6 +860,23 @@ class MeshV2Service {
async _reportData (payload) {
if (!this.groupId || !this.client) return;

// Final delta check: Filter out items that haven't changed since the LAST SUCCESSFUL transmission.
// This handles cases where values changed back while an earlier mutation was in flight.
const finalPayload = payload.filter(item => this.lastSentData[item.key] !== item.value);

if (finalPayload.length === 0) {
log.debug('Mesh V2: Skipping mutation as all data is already up-to-date on server');
return {
data: {
reportDataByNode: {
nodeStatus: {
data: payload // Return original payload to satisfy caller expectation
}
}
}
};
}

try {
this.costTracking.mutationCount++;
this.costTracking.reportDataCount++;
Expand All @@ -861,14 +888,14 @@ class MeshV2Service {
groupId: this.groupId,
domain: this.domain,
nodeId: this.meshId,
data: payload
data: finalPayload
}
});

const result = await this.lastDataSendPromise;

// Update last sent data on success
payload.forEach(item => {
finalPayload.forEach(item => {
this.lastSentData[item.key] = item.value;
});

Expand Down Expand Up @@ -995,11 +1022,12 @@ class MeshV2Service {
startPeriodicDataSync () {
this.stopPeriodicDataSync();

// Sync every 5 minutes
const interval = this.periodicDataSyncInterval;
log.info(`Mesh V2: Starting periodic data sync timer (Interval: ${interval / 1000}s)`);
this.dataSyncTimer = setInterval(() => {
log.info('Mesh V2: Periodic data sync');
this.fetchAllNodesData();
}, 5 * 60 * 1000);
}, interval);
}

/**
Expand Down
4 changes: 1 addition & 3 deletions src/extensions/scratch3_mesh_v2/rate-limiter.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ const log = require('../../util/log');
/* istanbul ignore next */
class RateLimiter {
/**
* @param {number} maxPerSecond - Maximum number of requests per second.
* @param {number} intervalMs - Minimum interval between requests in milliseconds.
* @param {object} options - Optional parameters.
* @param {boolean} options.enableMerge - Whether to merge data in the queue (default: false).
* @param {string} options.mergeKeyField - Field name to use as merge key (default: 'key').
*/
constructor (maxPerSecond, intervalMs, options = {}) {
this.maxPerSecond = maxPerSecond;
constructor (intervalMs, options = {}) {
this.intervalMs = intervalMs;
this.queue = [];
this.lastSendTime = 0;
Expand Down
108 changes: 108 additions & 0 deletions test/unit/extension_mesh_v2_delta.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
const test = require('tap').test;
const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service');

const createMockBlocks = () => ({
runtime: {
on: () => {},
getTargetForStage: () => ({
variables: {}
}),
sequencer: {}
},
opcodeFunctions: {
event_broadcast: () => {}
}
});

test('MeshV2Service Delta Transmission', t => {
const blocks = createMockBlocks();
const service = new MeshV2Service(blocks, 'node1', 'domain1');

// Mock client and rate limiter
let mutationCount = 0;
let reportedData = null;

service.client = {
mutate: ({variables}) => {
mutationCount++;
reportedData = variables.data;
return Promise.resolve({
data: {
reportDataByNode: {
nodeStatus: {
data: variables.data
}
}
}
});
}
};
service.groupId = 'g1';
service.domain = 'd1';

// Set a very short interval for rate limiter to speed up tests
service.dataRateLimiter.intervalMs = 0;

t.test('should send all data initially', async st => {
const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '2'}];
await service.sendData(data);

st.equal(mutationCount, 1, 'Should call mutation');
st.same(reportedData, data, 'Should send all data');
st.same(service.lastSentData, {v1: '1', v2: '2'}, 'Should update lastSentData');
st.end();
});

t.test('should skip sending if data is unchanged', async st => {
mutationCount = 0;
reportedData = null;

const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '2'}];
await service.sendData(data);

st.equal(mutationCount, 0, 'Should NOT call mutation if data is unchanged');
st.end();
});

t.test('should only send changed items (delta)', async st => {
mutationCount = 0;
reportedData = null;

// Only v2 changed
const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '3'}];
await service.sendData(data);

st.equal(mutationCount, 1, 'Should call mutation if some data changed');
st.same(reportedData, [{key: 'v2', value: '3'}], 'Should ONLY send changed items');
st.same(service.lastSentData, {v1: '1', v2: '3'}, 'Should update lastSentData for changed item');
st.end();
});

t.test('should send new items', async st => {
mutationCount = 0;
reportedData = null;

const data = [{key: 'v1', value: '1'}, {key: 'v2', value: '3'}, {key: 'v3', value: '4'}];
await service.sendData(data);

st.equal(mutationCount, 1);
st.same(reportedData, [{key: 'v3', value: '4'}], 'Should send new items');
st.end();
});

t.test('should handle single item sendData calls from index.js', async st => {
mutationCount = 0;
reportedData = null;

// index.js often calls: sendData([{key: name, value: value}])
await service.sendData([{key: 'v1', value: '1'}]); // Unchanged
st.equal(mutationCount, 0, 'Should skip unchanged single item');

await service.sendData([{key: 'v1', value: 'updated'}]); // Changed
st.equal(mutationCount, 1, 'Should send changed single item');
st.same(reportedData, [{key: 'v1', value: 'updated'}]);
st.end();
});

t.end();
});
Loading
Loading