Skip to content

Commit b3e1193

Browse files
committed
Add writeSyncLog
1 parent a050bd0 commit b3e1193

File tree

6 files changed

+56
-0
lines changed

6 files changed

+56
-0
lines changed

orm.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,10 @@ class Orm extends EventEmitter {
8989
getOptions(collectionName, dbName) {
9090
}
9191

92+
//mock
93+
writeSyncLog(type, mess) {
94+
}
95+
9296
plugin(plugin, ...args) {
9397
const _this = new Proxy(Orm, proxyHandlerFactory(this));
9498
plugin(_this, ...args);

sync/sync-bulk-utils.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const _ = require('lodash')
22
const jsonfn = require('json-fn')
3+
const { EVENT_CONSTANT } = require('./sync-log')
34

45
module.exports = function (orm) {
56
orm.doCreateBulk = doCreateBulk
@@ -163,6 +164,7 @@ module.exports = function (orm) {
163164
await orm.removeFakeOfCollection(col, {})
164165
while (true) {
165166
try {
167+
orm.writeSyncLog(EVENT_CONSTANT.DO_BULK_QUERY, bulkOp.map(op => op._id))
166168
if (!bulkOp.length) return
167169
await orm(col).bulkWrite(bulkOp).direct()
168170
await orm('CommitData').updateOne({}, { [`highestCommitIdOfCollection.${col}`]: _.last(bulkOp).id })
@@ -173,9 +175,11 @@ module.exports = function (orm) {
173175
&& e.errors[1].writeErrors.length && e.errors[1].writeErrors[0].err && e.errors[1].writeErrors[0].err.index !== undefined) {
174176
const index = e.errors[1].writeErrors[0].err.index
175177
await orm('CommitData').updateOne({}, { [`highestCommitIdOfCollection.${col}`]: bulkOp[index].id })
178+
orm.writeSyncLog(EVENT_CONSTANT.BULK_ERROR, bulkOp[index]._id)
176179
orm.setHighestCommitIdOfCollection(col, bulkOp[index].id)
177180
bulkOp = bulkOp.slice(index + 1)
178181
} else {
182+
orm.writeSyncLog(EVENT_CONSTANT.DO_BULK_QUERY, bulkOp.map(op => op._id))
179183
console.error('Wrong thing happened in bulk write', e.message)
180184
return
181185
}
@@ -187,6 +191,7 @@ module.exports = function (orm) {
187191
const bulkOp = {}
188192
if (!commits.length)
189193
return
194+
orm.writeSyncLog(EVENT_CONSTANT.EXEC_BULK, commits.map(commit => commit._id))
190195
for (const commit of commits) {
191196
const { value: highestId } = await orm.emit('getHighestCommitId', commit.dbName)
192197
if (commit.id <= highestId) continue
@@ -197,6 +202,7 @@ module.exports = function (orm) {
197202
_query.chain[0].fn = convertedQuery[_query.chain[0].fn]
198203
const convertedChain = convertChain(_query.chain)
199204
convertedChain.id = commit.id
205+
convertedChain._id = commit._id
200206
if (convertedChain) {
201207
const run = !(await orm.emit(`commit:handler:shouldNotExecCommand:${commit.collectionName}`, commit));
202208
if (!run) continue

sync/sync-flow.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const AwaitLock = require('await-lock').default;
22
const _ = require('lodash');
33
const jsonFn = require('json-fn')
4+
const { EVENT_CONSTANT } = require('./sync-log')
45

56
module.exports = function (orm, role) {
67
let masterDbMap = (orm.mode === 'multi' ? {} : false)
@@ -47,13 +48,15 @@ module.exports = function (orm, role) {
4748
}
4849
//todo: [process:commit] can return array
4950
let _commit = _.cloneDeep(commit)
51+
orm.writeSyncLog(EVENT_CONSTANT.START_FLOW, commit)
5052
await orm.emit(`process:commit:${commit.collectionName}`, _commit, target)
5153
if (_commit.tags) {
5254
for (const tag of _commit.tags) {
5355
await orm.emit(`process:commit:${tag}`, _commit)
5456
}
5557
}
5658
commit = _commit;
59+
orm.writeSyncLog(EVENT_CONSTANT.AFTER_FIRST_PROCESS, commit)
5760

5861
let value
5962
if (!checkMaster(commit.dbName)) {
@@ -65,6 +68,7 @@ module.exports = function (orm, role) {
6568
commit.fromClient = (await orm.emit('getCommitDataId')).value
6669
commit.createdDate = new Date()
6770
commit._fakeId = fakeId
71+
orm.writeSyncLog(EVENT_CONSTANT.SEND_TO_MASTER, commit._id)
6872
orm.emit('transport:toMaster', commit)
6973
fakeId += 1
7074
await orm('CommitData').updateOne({}, { fakeId })
@@ -73,6 +77,7 @@ module.exports = function (orm, role) {
7377
commit.fromMaster = true;
7478
const lock = new AwaitLock()
7579
await lock.acquireAsync()
80+
orm.writeSyncLog(EVENT_CONSTANT.START_EXECUTE)
7681
orm.once(`commit:result:${commit._id.toString()}`, function (result) {
7782
value = result
7883
lock.release()
@@ -117,17 +122,20 @@ module.exports = function (orm, role) {
117122

118123
orm.onQueue('update:Commit:c', 'fake-channel', async function (commit) {
119124
if (!commit.id) return
125+
orm.writeSyncLog(EVENT_CONSTANT.FLOW_EXEC, commit._id)
120126
if (!checkMaster(commit.dbName)) {
121127
await orm.emit('commit:update-fake', commit);
122128
}
123129
const run = !(await orm.emit(`commit:handler:shouldNotExecCommand:${commit.collectionName}`, commit));
130+
orm.writeSyncLog(EVENT_CONSTANT.FLOW_SHOULD_NOT_EXEC, commit._id + ' ' + run)
124131
let result
125132
if (run) {
126133
let query = orm.getQuery(commit)
127134
if (commit.dbName) query.name += `@${commit.dbName}`
128135
try {
129136
result = await orm.execChain(query)
130137
} catch (e) {
138+
orm.writeSyncLog(EVENT_CONSTANT.FLOW_EXEC_ERROR, commit._id + ' ' + e.message)
131139
console.error('Error on query', jsonFn.stringify(query), 'is', e)
132140
await orm.emit('commit:report:errorExec', commit.id, e.message)
133141
}

sync/sync-log.js

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
module.exports.EVENT_CONSTANT = {
2+
START_FLOW: 0,
3+
AFTER_FIRST_PROCESS: 1,
4+
SEND_TO_MASTER: 2,
5+
BUILD_FAKE: 3,
6+
ADDED_TO_QUEUE: 4,
7+
DO_SEND: 5,
8+
FINISH_SEND: 6,
9+
COMMIT_REQUEST: 7,
10+
11+
START_EXECUTE: 100,
12+
CREATE_COMMIT: 101,
13+
PERFECT_FORM: 102,
14+
15+
REQUIRE_SYNC: 199,
16+
EXEC_BULK: 200,
17+
DO_BULK_QUERY: 201,
18+
BULK_ERROR: 202,
19+
CREATE_COMMIT_CLIENT: 203,
20+
FLOW_EXEC: 204,
21+
FLOW_SHOULD_NOT_EXEC: 205,
22+
FLOW_EXEC_ERROR: 206
23+
}

sync/sync-plugin-multi.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const replaceMasterUtils = require('./sync-utils/replace-master')
1010
const storeOldCommits = require('./sync-utils/store-old-commits')
1111
const {handleExtraProps} = require('./sync-handle-extra-props');
1212
const AwaitLock = require('await-lock').default
13+
const { EVENT_CONSTANT } = require('./sync-log')
1314

1415
const syncPlugin = function (orm) {
1516
const whitelist = []
@@ -420,6 +421,7 @@ const syncPlugin = function (orm) {
420421

421422
orm.onQueue("commit:build-fake", 'fake-channel', async function (query, target, commit) {
422423
if (!commit.chain) return;
424+
orm.writeSyncLog(EVENT_CONSTANT.BUILD_FAKE, commit._id)
423425
const isDeleteCmd = target.cmd.includes('delete') || target.cmd.includes('remove')
424426
const fakeCollectionName = 'Recovery' + commit.collectionName
425427
const _query = orm.getQuery(commit)
@@ -621,6 +623,8 @@ const syncPlugin = function (orm) {
621623

622624
orm.onQueue('transport:requireSync:callback', async function (commits) {
623625
if (!commits || !commits.length) return
626+
const _idsList = commits.map(commit => commit._id)
627+
orm.writeSyncLog(EVENT_CONSTANT.REQUIRE_SYNC, _idsList)
624628
const archivedCommits = _.remove(commits, commit => commit.data && !!commit.data.__arc)
625629
if (commits.length > COMMIT_BULK_WRITE_THRESHOLD) {
626630
await validateCommits(commits)
@@ -719,6 +723,7 @@ const syncPlugin = function (orm) {
719723
orm.onQueue('createCommit', async function (commit) {
720724
if (lock.acquired)
721725
return
726+
orm.writeSyncLog(orm.isMaster() ? EVENT_CONSTANT.CREATE_COMMIT : EVENT_CONSTANT.CREATE_COMMIT_CLIENT, commit._id)
722727
if (!commit.id) {
723728
let { value: highestId } = await orm.emit('getHighestCommitId', commit.dbName);
724729
if (highestIdInMemory)
@@ -738,6 +743,7 @@ const syncPlugin = function (orm) {
738743
if (orm.isMaster()) {
739744
commit.execDate = new Date()
740745
commit.isPending = true
746+
orm.writeSyncLog(EVENT_CONSTANT.PERFECT_FORM, commit)
741747
}
742748
this.value = await orm(`Commit`, commit.dbName).create(commit);
743749
} catch (e) {
@@ -803,6 +809,7 @@ const syncPlugin = function (orm) {
803809

804810
orm.onQueue('commitRequest', async function (commits) {
805811
try {
812+
orm.writeSyncLog(EVENT_CONSTANT.COMMIT_REQUEST, commits.map(commit => commit._id))
806813
if (!commits || !commits.length)
807814
return
808815
for (let commit of commits) {

sync/sync-queue-commit.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const QUEUE_COMMIT_MODEL = 'QueueCommit'
22
const dayjs = require('dayjs')
33
const _ = require('lodash')
4+
const { EVENT_CONSTANT } = require('./sync-log')
45

56
module.exports = function (orm) {
67
let disabled = false
@@ -43,6 +44,7 @@ module.exports = function (orm) {
4344
commit,
4445
dateAdded: new Date()
4546
})
47+
orm.writeSyncLog(EVENT_CONSTANT.ADDED_TO_QUEUE, commit._id)
4648
if (orm.getQueue('queue:send').length <= 1)
4749
orm.emit('queue:send')
4850
})
@@ -51,6 +53,7 @@ module.exports = function (orm) {
5153
const removedCommitUUID = _queueCommit.map((commit) => {
5254
return commit._id
5355
})
56+
orm.writeSyncLog(EVENT_CONSTANT.FINISH_SEND, removedCommitUUID)
5457
await orm(QUEUE_COMMIT_MODEL).deleteMany({ 'commit._id': { $in: removedCommitUUID } })
5558
})
5659

@@ -73,6 +76,11 @@ module.exports = function (orm) {
7376
if (disabled) return
7477
console.log('Try to send to master')
7578
const data = await orm(QUEUE_COMMIT_MODEL).find()
79+
try {
80+
orm.writeSyncLog(EVENT_CONSTANT.DO_SEND, data.map(item => item.commit._id))
81+
} catch (e) {
82+
console.error(e)
83+
}
7684
await orm.emit('transport:send', data.map(item => item.commit))
7785
}
7886

0 commit comments

Comments
 (0)