From 5c189cc64103de4838aaaaa0611a14db0100c603 Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 2 Apr 2024 00:38:00 +0530 Subject: [PATCH 1/2] feat: :sparkles: implement worker thread mechanism to operate for 1brc --- calculate_average_mojjominion.sh | 18 ++ src/main/nodejs/mojjominion/.gitignore | 175 +++++++++++++ src/main/nodejs/mojjominion/index.js | 237 ++++++++++++++++++ src/main/nodejs/mojjominion/package-lock.json | 27 ++ src/main/nodejs/mojjominion/package.json | 7 + src/main/nodejs/mojjominion/tsconfig.json | 10 + src/main/nodejs/mojjominion/yarn.lock | 15 ++ 7 files changed, 489 insertions(+) create mode 100755 calculate_average_mojjominion.sh create mode 100644 src/main/nodejs/mojjominion/.gitignore create mode 100644 src/main/nodejs/mojjominion/index.js create mode 100644 src/main/nodejs/mojjominion/package-lock.json create mode 100644 src/main/nodejs/mojjominion/package.json create mode 100644 src/main/nodejs/mojjominion/tsconfig.json create mode 100644 src/main/nodejs/mojjominion/yarn.lock diff --git a/calculate_average_mojjominion.sh b/calculate_average_mojjominion.sh new file mode 100755 index 0000000..0830b67 --- /dev/null +++ b/calculate_average_mojjominion.sh @@ -0,0 +1,18 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +time node src/main/nodejs/mojjominion/index.js measurements.txt diff --git a/src/main/nodejs/mojjominion/.gitignore b/src/main/nodejs/mojjominion/.gitignore new file mode 100644 index 0000000..468f82a --- /dev/null +++ b/src/main/nodejs/mojjominion/.gitignore @@ -0,0 +1,175 @@ +# Based on https://raw.githubusercontent.com/github/gitignore/main/Node.gitignore + +# Logs + +logs +_.log +npm-debug.log_ +yarn-debug.log* +yarn-error.log* +lerna-debug.log* +.pnpm-debug.log* + +# Caches + +.cache + +# Diagnostic reports (https://nodejs.org/api/report.html) + +report.[0-9]_.[0-9]_.[0-9]_.[0-9]_.json + +# Runtime data + +pids +_.pid +_.seed +*.pid.lock + +# Directory for instrumented libs generated by jscoverage/JSCover + +lib-cov + +# Coverage directory used by tools like istanbul + +coverage +*.lcov + +# nyc test coverage + +.nyc_output + +# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files) + +.grunt + +# Bower dependency directory (https://bower.io/) + +bower_components + +# node-waf configuration + +.lock-wscript + +# Compiled binary addons (https://nodejs.org/api/addons.html) + +build/Release + +# Dependency directories + +node_modules/ +jspm_packages/ + +# Snowpack dependency directory (https://snowpack.dev/) + +web_modules/ + +# TypeScript cache + +*.tsbuildinfo + +# Optional npm cache directory + +.npm + +# Optional eslint cache + +.eslintcache + +# Optional stylelint cache + +.stylelintcache + +# Microbundle cache + +.rpt2_cache/ +.rts2_cache_cjs/ +.rts2_cache_es/ +.rts2_cache_umd/ + +# Optional REPL history + +.node_repl_history + +# Output of 'npm pack' + +*.tgz + +# Yarn Integrity file + +.yarn-integrity + +# dotenv environment variable files + +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# parcel-bundler cache (https://parceljs.org/) + +.parcel-cache + +# Next.js build output + +.next +out + +# Nuxt.js build / generate output + +.nuxt +dist + +# Gatsby files + +# Comment in the public line in if your project uses Gatsby and not Next.js + +# https://nextjs.org/blog/next-9-1#public-directory-support + +# public + +# vuepress build output + +.vuepress/dist + +# vuepress v2.x temp and cache directory + +.temp + +# Docusaurus cache and generated files + +.docusaurus + +# Serverless directories + +.serverless/ + +# FuseBox cache + +.fusebox/ + +# DynamoDB Local files + +.dynamodb/ + +# TernJS port file + +.tern-port + +# Stores VSCode versions used for testing VSCode extensions + +.vscode-test + +# yarn v2 + +.yarn/cache +.yarn/unplugged +.yarn/build-state.yml +.yarn/install-state.gz +.pnp.* + +# IntelliJ based IDEs +.idea + +# Finder (MacOS) folder config +.DS_Store diff --git a/src/main/nodejs/mojjominion/index.js b/src/main/nodejs/mojjominion/index.js new file mode 100644 index 0000000..4539cb0 --- /dev/null +++ b/src/main/nodejs/mojjominion/index.js @@ -0,0 +1,237 @@ +import * as fs from "fs"; +import * as fsp from "fs/promises"; +import * as os from 'os' +import { parentPort, workerData, Worker, isMainThread } from "worker_threads"; + +// Copied some snippets from the solution below +// CREDITS: https://github.com/1brc/nodejs/blob/main/src/main/nodejs/Edgar-P-yan/index.js + +const filePath = process.argv[2]; +const WORKERS = os.cpus().length; +const SEMI_COLON = ";".charCodeAt(0); +const NEW_LINE = "\n".charCodeAt(0); + + +/** + * @param {Function} fnc + */ +// async function perf(fnc) { +// console.time(fnc.name.toString()); +// await fnc(); +// console.timeEnd(fnc.name.toString()); +// } + +if (isMainThread) { + runMainThread(); +} else { + const { filePath: filename, start, end } = workerData; + readFileSection(+start, +end); + const map = new Map(); + /** + * @param {number} start + * @param {number} end + */ + function readFileSection(start, end) { + /** @type Buffer */ + let leftoverChunk = undefined; + const readStream = fs.createReadStream(filename, { start, end }); + readStream.on( + "data", + ( + /** @type Buffer */ + buffer + ) => { + let chunk = buffer; + if (leftoverChunk) { + chunk = Buffer.allocUnsafe(leftoverChunk.length + buffer.length); + leftoverChunk.copy(chunk, 0); + buffer.copy(chunk, leftoverChunk.length); + leftoverChunk = null; + } + let index = chunk.lastIndexOf(NEW_LINE); + if (index < chunk.length - 1) { + leftoverChunk = chunk.subarray(index + 1); + } + parseChunkLines(chunk.subarray(0, index), map); + }); + readStream.on("end", () => { + parentPort.postMessage(map); + }); + } +} + +/** + * @typedef {Object} MapEntry + * @property {number} sum - The sum of all temperatures. + * @property {number} count - The count of entries + * @property {number} min + * @property {number} max + */ +async function runMainThread() { + let promises = []; + const fileSize = (await fsp.stat(filePath)).size; + const fileChuckSize = Math.ceil(fileSize / WORKERS); + + async function getLinedChunks() { + const MAX_LINE_LENGTH = 100 + 1 + 4 + 1; + let offset = 0; + let buf = Buffer.alloc(MAX_LINE_LENGTH); + const chunkOffsets = [0]; + const file = await fsp.open(filePath); + while (true) { + offset += fileChuckSize; + if (offset >= fileSize) { + chunkOffsets.push(fileSize); + break; + } + await file.read(buf, 0, MAX_LINE_LENGTH, offset); + let index = buf.indexOf(NEW_LINE); + buf.fill(0); + if (index == -1) { + chunkOffsets.push(fileSize); + break; + } + offset += index + 1; + chunkOffsets.push(offset); + } + await file.close(); + return chunkOffsets; + } + + const chunks = await getLinedChunks(); + /** @type Map */ + const finalResult = new Map(); + for (let i = 0; i < chunks.length - 1; i++) { + const workerThread = new Worker( + new URL(import.meta.resolve('./index.js')), + { + workerData: { filePath, start: chunks[i], end: chunks[i + 1] }, + }); + const promise = new Promise((resolve) => { + workerThread.on( + "message", + ( + /** + * @type Map + */ + message + ) => { + // console.log( + // `Worker ${i + 1} finished reading ${message.size} items.` + // ); + for (let [key, value] of message.entries()) { + const existing = finalResult.get(key); + if (existing) { + existing.min = Math.min(existing.min, value.min); + existing.max = Math.max(existing.max, value.max); + existing.sum += value.sum; + existing.count += value.count; + } else { + finalResult.set(key, value); + } + } + resolve(); + } + ); + }); + promises.push(promise); + } + await Promise.all(promises); + printDB(finalResult); + // divertToFile(() => printDB(finalResult)); +} + +/** + * @param {Buffer} buff : '-22.3', '22.0', '2.2', '-0.2', + */ +function parseTemperature(buff) { + let temperature = 0; + let sign = buff[0] === "-".charCodeAt(0) ? -1 : 1; + for (let i = 0; i < buff.length; i++) { + if (buff[i] == ".".charCodeAt(0)) continue; + if (buff[i] == "-".charCodeAt(0)) continue; + temperature *= 10; + temperature += buff[i] - "0".charCodeAt(0); + } + return sign * temperature; +} + +// Function to extract lines from buffer chunk +/** + * @param {Buffer} chunk + * @param {Map} map + */ +function parseChunkLines(chunk, map) { + let start = 0; + for (let i = 0; i < chunk.length + 1; i++) { + if (chunk[i] !== NEW_LINE && i <= chunk.length - 1) continue; + let colonIndex = chunk.indexOf(SEMI_COLON, start); + let nameBuff = chunk.subarray(start, colonIndex); + let tempBuff = chunk.subarray(colonIndex + 1, i); + start = i + 1; + + let name = nameBuff.toString(); + let temperature = parseTemperature(tempBuff) + let existing = map.get(name); + if (existing) { + existing.count += 1; + existing.sum += temperature; + existing.min = Math.min(existing.min, temperature); + existing.max = Math.max(existing.max, temperature); + } else { + map.set(name, { count: 1, sum: temperature, min: temperature, max: temperature }); + } + } +} + +/** + * @example + * round(1.2345) // "1.2" + * round(1.55) // "1.6" + * round(1) // "1.0" + * + * @param {number} num + * @returns {string} + */ +function round(num) { + const fixed = Math.round(10 * num) / 10; + return fixed.toFixed(1); +} +/** + * @param {Function} fnc + */ +// function divertToFile(fnc) { +// let stream = fs.createWriteStream("indexout.txt"); +// let stdout = process.stdout.write; +// let stderr = process.stderr.write; +// process.stdout.write = process.stderr.write = stream.write.bind(stream); +// fnc(); +// process.stdout.write = stdout; +// process.stderr.write = stderr; +// stream.end(); +// } + +/** + * @param {Map} db + */ +export function printDB(db) { + const sortedStations = Array.from(db.keys()).sort(); + process.stdout.write("{"); + for (let i = 0; i < sortedStations.length; i++) { + if (i > 0) { + process.stdout.write(", "); + } + const data = db.get(sortedStations[i]); + process.stdout.write(sortedStations[i]); + process.stdout.write("="); + process.stdout.write( + round(data.min / 10) + + "/" + + round(data.sum / 10 / data.count) + + "/" + + round(data.max / 10) + ); + } + process.stdout.write("}\n"); +} + diff --git a/src/main/nodejs/mojjominion/package-lock.json b/src/main/nodejs/mojjominion/package-lock.json new file mode 100644 index 0000000..fc27bc0 --- /dev/null +++ b/src/main/nodejs/mojjominion/package-lock.json @@ -0,0 +1,27 @@ +{ + "name": "nodejs", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "devDependencies": { + "@types/node": "^20.10.6" + } + }, + "node_modules/@types/node": { + "version": "20.10.6", + "resolved": "https://registry.npmjs.org/@types/node/-/node-20.10.6.tgz", + "integrity": "sha512-Vac8H+NlRNNlAmDfGUP7b5h/KA+AtWIzuXy0E6OyP8f1tCLYAtPvKRRDJjAPqhpCb0t6U2j7/xqAuLEebW2kiw==", + "dev": true, + "dependencies": { + "undici-types": "~5.26.4" + } + }, + "node_modules/undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", + "dev": true + } + } +} diff --git a/src/main/nodejs/mojjominion/package.json b/src/main/nodejs/mojjominion/package.json new file mode 100644 index 0000000..cb4b32b --- /dev/null +++ b/src/main/nodejs/mojjominion/package.json @@ -0,0 +1,7 @@ +{ + "type": "module", + "module": "es2022", + "devDependencies": { + "@types/node": "^20.10.6" + } +} diff --git a/src/main/nodejs/mojjominion/tsconfig.json b/src/main/nodejs/mojjominion/tsconfig.json new file mode 100644 index 0000000..57fc6e6 --- /dev/null +++ b/src/main/nodejs/mojjominion/tsconfig.json @@ -0,0 +1,10 @@ +{ + "compilerOptions": { + "module": "es2022", + "target": "es2022", + "allowJs": true, + "checkJs": true, + "outDir": "dist", + "types": ["node"] + } +} diff --git a/src/main/nodejs/mojjominion/yarn.lock b/src/main/nodejs/mojjominion/yarn.lock new file mode 100644 index 0000000..c222830 --- /dev/null +++ b/src/main/nodejs/mojjominion/yarn.lock @@ -0,0 +1,15 @@ +# THIS IS AN AUTOGENERATED FILE. DO NOT EDIT THIS FILE DIRECTLY. +# yarn lockfile v1 + + +"@types/node@^20.10.6": + version "20.12.2" + resolved "https://registry.yarnpkg.com/@types/node/-/node-20.12.2.tgz#9facdd11102f38b21b4ebedd9d7999663343d72e" + integrity sha512-zQ0NYO87hyN6Xrclcqp7f8ZbXNbRfoGWNcMvHTPQp9UUrwI0mI7XBz+cu7/W6/VClYo2g63B0cjull/srU7LgQ== + dependencies: + undici-types "~5.26.4" + +undici-types@~5.26.4: + version "5.26.5" + resolved "https://registry.yarnpkg.com/undici-types/-/undici-types-5.26.5.tgz#bcd539893d00b56e964fd2657a4866b221a65617" + integrity sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA== From 241650f8b1fe200fd35423cc332523424b5d80ea Mon Sep 17 00:00:00 2001 From: Mohan Date: Tue, 2 Apr 2024 00:41:01 +0530 Subject: [PATCH 2/2] feat: :lipstick: replace diff with git diff --color-words --- test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test.sh b/test.sh index 4bec036..4a15820 100755 --- a/test.sh +++ b/test.sh @@ -28,6 +28,6 @@ for sample in $(ls src/test/resources/samples/*.txt); do rm -f measurements.txt ln -s $sample measurements.txt - diff <("./calculate_average_$1.sh") ${sample%.txt}.out + git diff --color-words <("./calculate_average_$1.sh") ${sample%.txt}.out done rm measurements.txt