-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmulti.js
153 lines (130 loc) · 4.49 KB
/
multi.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
const { Worker } = require('worker_threads');
const os = require('os');
const fs = require('fs');
const stream = require('stream');
const cpus = os.cpus().length;
let finishedReading = false;
let found = false;
let onlineCount = 0;
let roundCount = 0;
let busyCount = 0;
const pool = Array.from({ length: cpus }).map(() => {
const worker = new Worker('./multiply.js');
// used to start the real program when all the threads are ready
worker.on('online', () => {
onlineCount++;
if (onlineCount === cpus) start();
});
worker.on('message', async message => {
messagesDispatcher(message.id, message.data);
/*if (message.type === 'result') {
busyCount--;
if (message.found) found = true;
if (finishedReading && busyCount === 0) {
if (found) console.log('found :D');
else console.log('not found :(');
await Promise.all(pool.map(w => w.terminate()));
}
}*/
});
return worker;
});
/*const messages = [{
id,
cb,
state: 'pending | ready | sent'
}]*/
let messagesDispatcher;
const transformStream = () => {
const messages = [];
let sentCount = 0;
const dispatch = messages => (id, data) => {
console.log(messages.map((msg, index) => `${index} - ${msg.status}`).join('\n'));
console.log('DISPATCHING', id);
const message = messages[id];
if (!message) return;
console.log('STATUS', message.status, data ? data.length : -1);
if (id === 0) {
console.log('SENDING OUT', id, typeof messages[id + 1]);
message.handler(data);
message.status = 'sent';
messages[id + 1] && dispatch(messages)(id + 1, messages[id + 1].data);
sentCount++;
} else {
const prevMessage = messages[id - 1];
if (prevMessage.status === 'sent' && message.status !== 'sent' && data && data.length > 0) {
console.log('SENDING OUT', id, typeof messages[id + 1], data.length);
message.handler(data);
message.status = 'sent';
messages[id + 1] && dispatch(messages)(id + 1, messages[id + 1].data);
sentCount++;
} else {
message.status = 'ready';
message.data = data;
}
}
// check the previous message if it is already sent
// if it's not ready, it save it with a "ready" state
// if it's ready, it sends the message and start iterating with the next one and if it's ready continue until finds one in a "pending/sent" state
};
messagesDispatcher = dispatch(messages);
// TODO: transform does not work because next pushes the data into the readable stream and asks for the next chunk. cannot be parallelised
/*const ts = new stream.Transform({
transform(chunk, encoding, next) {
console.log('WRITING');
const id = messages.length;
messages.push({
data: [],
status: 'pending',
handler: data => next(null, data)
});
pool[roundCount % cpus].postMessage({ chunk, id });
}
});*/
// in this case, the thread pool must take care to the data storage
// when a worker sends back the new data, the thread pool cb must save that data to some sort of storage
// the thread pool must save the data in series (also this can be user defined)
const ds = new stream.Duplex({
// output
read(size) {
// request data from a source (fs? db? network? maybe user defined)
// size must be saved with a counter so the thread pool cb knows the next offset
},
// input
write(chunk, encoding, next) {
// sends data to the thread pool
// call next immediately
}
});
return ds;
};
const start = () => {
const inStream = fs.createReadStream('./data3.txt');
/*const tStream = () => new stream.Transform({
transform(chunk, e, next) {
console.log('reading chunk');
this.push(chunk);
next();
}
})*/
inStream.pipe(transformStream()).pipe(process.stdout).on('end', async () => {
await Promise.all(pool.map(w => w.terminate()));
});
/*inStream.on('data', data => {
pool[roundCount % cpus].postMessage(data);
busyCount++;
roundCount++;
});
inStream.on('end', () => {
finishedReading = true;
});*/
};
process.on('SIGINT', async () => await Promise.all(pool.map(w => w.terminate())))
process.on('SIGTERM', async () => await Promise.all(pool.map(w => w.terminate())))
/*const fs = require('fs');
const stream = fs.createReadStream('./data.txt');
stream.on('data', data => {
console.log(data.toString().length);
});
fs.createReadStream(filePath).pipe(job(processor)).pipe(handler);
job(processor, { stream }).pipe(handler)*/