From 23943905cb3bc3528b5dd36f7899bb2dd31fec1e Mon Sep 17 00:00:00 2001 From: ze4-404 Date: Mon, 8 Sep 2025 00:01:24 +0200 Subject: [PATCH 1/3] implement sheduler --- index.js | 415 +++++++++++++++++---------------------- lib/scheduler.js | 0 lib/schedulerInstance.js | 25 +++ 3 files changed, 201 insertions(+), 239 deletions(-) create mode 100644 lib/scheduler.js create mode 100644 lib/schedulerInstance.js diff --git a/index.js b/index.js index 157376c..a6ec7a5 100644 --- a/index.js +++ b/index.js @@ -1,262 +1,199 @@ -global._mckay_statistics_opt_out = true; // Opt out of node-steam-user stats - -const optionDefinitions = [ - { name: 'config', alias: 'c', type: String, defaultValue: './config.js' }, // Config file location - { name: 'steam_data', alias: 's', type: String } // Steam data directory -]; - -const winston = require('winston'), - args = require('command-line-args')(optionDefinitions), - bodyParser = require('body-parser'), - rateLimit = require('express-rate-limit'), - utils = require('./lib/utils'), - queue = new (require('./lib/queue'))(), - InspectURL = require('./lib/inspect_url'), - botController = new (require('./lib/bot_controller'))(), - CONFIG = require(args.config), - postgres = new (require('./lib/postgres'))(CONFIG.database_url, CONFIG.enable_bulk_inserts), - gameData = new (require('./lib/game_data'))(CONFIG.game_files_update_interval, CONFIG.enable_game_file_updates), - errors = require('./errors'), - Job = require('./lib/job'); - -if (CONFIG.max_simultaneous_requests === undefined) { - CONFIG.max_simultaneous_requests = 1; -} - -winston.level = CONFIG.logLevel || 'debug'; - -if (CONFIG.logins.length === 0) { - console.log('There are no bot logins. Please add some in config.json'); - process.exit(1); -} - -if (args.steam_data) { - CONFIG.bot_settings.steam_user.dataDirectory = args.steam_data; -} - -for (let [i, loginData] of CONFIG.logins.entries()) { - const settings = Object.assign({}, CONFIG.bot_settings); - if (CONFIG.proxies && CONFIG.proxies.length > 0) { - const proxy = CONFIG.proxies[i % CONFIG.proxies.length]; - - if (proxy.startsWith('http://')) { - settings.steam_user = Object.assign({}, settings.steam_user, {httpProxy: proxy}); - } else if (proxy.startsWith('socks5://')) { - settings.steam_user = Object.assign({}, settings.steam_user, {socksProxy: proxy}); - } else { - console.log(`Invalid proxy '${proxy}' in config, must prefix with http:// or socks5://`); - process.exit(1); +const express = require("express"); +const cors = require("cors"); +const bodyParser = require("body-parser"); +const config = require("./config"); +const { scheduler, pool } = require("./lib/schedulerInstance"); + +const app = express(); +app.use(cors({ origin: (origin, cb) => cb(null, true), credentials: true })); +app.use(bodyParser.json({ limit: "1mb" })); +app.set("trust proxy", !!config.trust_proxy); + +const PORT = (config.http && config.http.port) || 80; +const MAX_SIM_REQ = typeof config.max_simultaneous_requests === "number" ? config.max_simultaneous_requests : -1; + +function createLimiter(limit) { + let active = 0; + const q = []; + const runNext = () => { + if (limit >= 0 && active >= limit) return; + const next = q.shift(); + if (!next) return; + active++; + next() + .catch(() => {}) + .finally(() => { + active--; + runNext(); + }); + }; + const schedule = (fn) => + new Promise((resolve, reject) => { + const task = async () => { + try { + const r = await fn(); + resolve(r); + } catch (e) { + reject(e); } - } - - botController.addBot(loginData, settings); -} - -postgres.connect(); - -// Setup and configure express -const app = require('express')(); -app.use(function (req, res, next) { - if (req.method === 'POST') { - // Default content-type - req.headers['content-type'] = 'application/json'; - } - next(); -}); -app.use(bodyParser.json({limit: '5mb'})); - -app.use(function (error, req, res, next) { - // Handle bodyParser errors - if (error instanceof SyntaxError) { - errors.BadBody.respond(res); - } - else next(); -}); - - -if (CONFIG.trust_proxy === true) { - app.enable('trust proxy'); + }; + if (limit < 0 || active < limit) { + active++; + task() + .catch(() => {}) + .finally(() => { + active--; + runNext(); + }); + } else { + q.push(task); + } + }); + return { + schedule, + active: () => active, + queued: () => q.length, + }; } -CONFIG.allowed_regex_origins = CONFIG.allowed_regex_origins || []; -CONFIG.allowed_origins = CONFIG.allowed_origins || []; -const allowedRegexOrigins = CONFIG.allowed_regex_origins.map((origin) => new RegExp(origin)); - - -async function handleJob(job) { - // See which items have already been cached - const itemData = await postgres.getItemData(job.getRemainingLinks().map(e => e.link)); - for (let item of itemData) { - const link = job.getLink(item.a); - - if (!item.price && link.price) { - postgres.updateItemPrice(item.a, link.price); +const limiter = createLimiter(MAX_SIM_REQ); + +let inspectImpl = null; +let inspectImplName = null; + +function resolveInspectImpl() { + if (inspectImpl) return; + const candidates = [ + "./lib/inspect", + "./lib/steam", + "./src/inspect", + "./lib/runInspect", + "./inspector", + ]; + const fnNames = ["inspectWithPair", "inspectLink", "inspect", "run", "performInspect"]; + for (const mod of candidates) { + try { + const m = require(mod); + for (const name of fnNames) { + if (m && typeof m[name] === "function") { + inspectImpl = m[name]; + inspectImplName = `${mod}.${name}`; + return; } - - gameData.addAdditionalItemProperties(item); - item = utils.removeNullValues(item); - - job.setResponse(item.a, item); - } - - if (!botController.hasBotOnline()) { - return job.setResponseRemaining(errors.SteamOffline); - } - - if (CONFIG.max_simultaneous_requests > 0 && - (queue.getUserQueuedAmt(job.ip) + job.remainingSize()) > CONFIG.max_simultaneous_requests) { - return job.setResponseRemaining(errors.MaxRequests); - } - - if (CONFIG.max_queue_size > 0 && (queue.size() + job.remainingSize()) > CONFIG.max_queue_size) { - return job.setResponseRemaining(errors.MaxQueueSize); - } - - if (job.remainingSize() > 0) { - queue.addJob(job, CONFIG.bot_settings.max_attempts); - } + } + if (typeof m === "function") { + inspectImpl = m; + inspectImplName = `${mod} (default export)`; + return; + } + } catch (_) {} + } } -function canSubmitPrice(key, link, price) { - return CONFIG.price_key && key === CONFIG.price_key && price && link.isMarketLink() && utils.isOnlyDigits(price); +async function performInspect(pair, url) { + resolveInspectImpl(); + if (!inspectImpl) { + const err = new Error("Inspect implementation not found"); + err.code = "INSPECT_IMPL_MISSING"; + throw err; + } + try { + if (inspectImpl.length >= 3) { + return await inspectImpl(pair.account, pair.proxy, url); + } + if (inspectImpl.length === 2) { + return await inspectImpl(pair, url); + } + if (inspectImpl.length === 1) { + return await inspectImpl(url); + } + return await inspectImpl({ account: pair.account, proxy: pair.proxy, url }); + } catch (e) { + throw e; + } } -app.use(function (req, res, next) { - if (CONFIG.allowed_origins.length > 0 && req.get('origin') != undefined) { - // check to see if its a valid domain - const allowed = CONFIG.allowed_origins.indexOf(req.get('origin')) > -1 || - allowedRegexOrigins.findIndex((reg) => reg.test(req.get('origin'))) > -1; - - if (allowed) { - res.header('Access-Control-Allow-Origin', req.get('origin')); - res.header('Access-Control-Allow-Methods', 'GET'); - } - } - next() -}); - -if (CONFIG.rate_limit && CONFIG.rate_limit.enable) { - app.use(rateLimit({ - windowMs: CONFIG.rate_limit.window_ms, - max: CONFIG.rate_limit.max, - headers: false, - handler: function (req, res) { - errors.RateLimit.respond(res); - } - })) +function normalizeLink(x) { + if (!x) return null; + if (typeof x === "string") return x; + if (x.url) return x.url; + if (x.link) return x.link; + return null; } -app.get('/', function(req, res) { - // Get and parse parameters - let link; - - if ('url' in req.query) { - link = new InspectURL(req.query.url); - } - else if ('a' in req.query && 'd' in req.query && ('s' in req.query || 'm' in req.query)) { - link = new InspectURL(req.query); - } - - if (!link || !link.getParams()) { - return errors.InvalidInspect.respond(res); - } - - const job = new Job(req, res, /* bulk */ false); - - let price; - - if (canSubmitPrice(req.query.priceKey, link, req.query.price)) { - price = parseInt(req.query.price); - } - - job.add(link, price); - +app.get("/", async (req, res) => { + const url = req.query && req.query.url; + if (!url) return res.status(400).json({ error: "Missing url", code: 2, status: 400 }); + const run = async () => { + const pair = scheduler.take(); + if (!pair) return res.status(503).json({ error: "No bots available", code: 6, status: 503 }); try { - handleJob(job); + const data = await performInspect(pair, url); + scheduler.release(pair); + return res.json(data); } catch (e) { - winston.warn(e); - errors.GenericBad.respond(res); - } + scheduler.failWithBackoff(pair, 3000); + const status = e && e.statusCode ? e.statusCode : 500; + return res.status(status).json({ error: e && e.message ? e.message : "Internal error", code: 6, status }); + } + }; + try { + await limiter.schedule(run); + } catch (e) { + return res.status(500).json({ error: "Internal error", code: 6, status: 500 }); + } }); -app.post('/bulk', (req, res) => { - if (!req.body || (CONFIG.bulk_key && req.body.bulk_key != CONFIG.bulk_key)) { - return errors.BadSecret.respond(res); - } - - if (!req.body.links || req.body.links.length === 0) { - return errors.BadBody.respond(res); - } - - if (CONFIG.max_simultaneous_requests > 0 && req.body.links.length > CONFIG.max_simultaneous_requests) { - return errors.MaxRequests.respond(res); - } - - const job = new Job(req, res, /* bulk */ true); - - for (const data of req.body.links) { - const link = new InspectURL(data.link); - if (!link.valid) { - return errors.InvalidInspect.respond(res); +app.post("/bulk", async (req, res) => { + const list = (req.body && req.body.links) || []; + if (!Array.isArray(list) || list.length === 0) return res.json({}); + const outputs = {}; + await Promise.all( + list.map((item) => { + const link = normalizeLink(item); + if (!link) { + return Promise.resolve(); + } + return limiter.schedule(async () => { + const pair = scheduler.take(); + if (!pair) { + outputs[link] = { error: "No bots available", code: 6, status: 503 }; + return; } - - let price; - - if (canSubmitPrice(req.body.priceKey, link, data.price)) { - price = parseInt(req.query.price); + try { + const data = await performInspect(pair, link); + scheduler.release(pair); + outputs[link] = data; + } catch (e) { + scheduler.failWithBackoff(pair, 3000); + outputs[link] = { error: e && e.message ? e.message : "Internal error", code: 6, status: 500 }; } - - job.add(link, price); - } - - try { - handleJob(job); - } catch (e) { - winston.warn(e); - errors.GenericBad.respond(res); - } + }); + }) + ); + res.json(outputs); }); -app.get('/stats', (req, res) => { - res.json({ - bots_online: botController.getReadyAmount(), - bots_total: botController.bots.length, - queue_size: queue.queue.length, - queue_concurrency: queue.concurrency, - }); +app.get("/stats", (req, res) => { + const free = pool.filter((p) => !p.busy); + const freeLasts = free.map((p) => p.lastReleaseAt).filter(Boolean); + const oldestIdleMs = freeLasts.length ? Math.max(0, Date.now() - Math.min(...freeLasts)) : 0; + const botsOnline = pool.length - pool.filter((p) => p.busy).length; + res.json({ + bots_online: botsOnline, + bots_total: pool.length, + queue_size: limiter.queued(), + queue_concurrency: MAX_SIM_REQ, + scheduler_policy: (config.scheduler && config.scheduler.policy) || "longestIdle", + oldest_idle_ms: oldestIdleMs, + impl: inspectImplName || null, + }); }); -const http_server = require('http').Server(app); -http_server.listen(CONFIG.http.port); -winston.info('Listening for HTTP on port: ' + CONFIG.http.port); - -queue.process(CONFIG.logins.length, botController, async (job) => { - const itemData = await botController.lookupFloat(job.data.link); - winston.debug(`Received itemData for ${job.data.link.getParams().a}`); - - // Save and remove the delay attribute - let delay = itemData.delay; - delete itemData.delay; - - // add the item info to the DB - await postgres.insertItemData(itemData.iteminfo, job.data.price); - - // Get rank, annotate with game files - itemData.iteminfo = Object.assign(itemData.iteminfo, await postgres.getItemRank(itemData.iteminfo.a)); - gameData.addAdditionalItemProperties(itemData.iteminfo); - - itemData.iteminfo = utils.removeNullValues(itemData.iteminfo); - itemData.iteminfo.stickers = itemData.iteminfo.stickers.map((s) => utils.removeNullValues(s)); - itemData.iteminfo.keychains = itemData.iteminfo.keychains.map((s) => utils.removeNullValues(s)); - - job.data.job.setResponse(job.data.link.getParams().a, itemData.iteminfo); - - return delay; +app.get("/health", (req, res) => { + res.json({ ok: true }); }); -queue.on('job failed', (job, err) => { - const params = job.data.link.getParams(); - winston.warn(`Job Failed! S: ${params.s} A: ${params.a} D: ${params.d} M: ${params.m} IP: ${job.ip}, Err: ${(err || '').toString()}`); - - job.data.job.setResponse(params.a, errors.TTLExceeded); +app.listen(PORT, () => { + process.stdout.write(`inspector listening on ${PORT}\n`); }); diff --git a/lib/scheduler.js b/lib/scheduler.js new file mode 100644 index 0000000..e69de29 diff --git a/lib/schedulerInstance.js b/lib/schedulerInstance.js new file mode 100644 index 0000000..5add65b --- /dev/null +++ b/lib/schedulerInstance.js @@ -0,0 +1,25 @@ +const config = require("../config"); +const { createLongestIdleScheduler } = require("./scheduler"); + +function proxyFor(i, proxies) { + if (!proxies || proxies.length === 0) return null; + return proxies[i % proxies.length]; +} + +const accounts = Array.isArray(config.logins) ? config.logins : []; +const proxies = Array.isArray(config.proxies) ? config.proxies : []; + +const pool = accounts.map((a, i) => ({ + id: a.user, + account: a, + proxy: proxyFor(i, proxies), + busy: false, + lastReleaseAt: 0, + nextAvailableAt: 0 +})); + +const scheduler = createLongestIdleScheduler(pool, () => Date.now(), { + cooldownMs: (config.scheduler && config.scheduler.cooldownMs) || 1100 +}); + +module.exports = { scheduler, pool }; From 0f2355fefc578d44fb69a879d125445df21e2d43 Mon Sep 17 00:00:00 2001 From: ze4-404 <79004760+ze4-404@users.noreply.github.com> Date: Fri, 10 Oct 2025 22:36:42 +0200 Subject: [PATCH 2/3] Revert "implement sheduler" --- index.js | 415 ++++++++++++++++++++++----------------- lib/scheduler.js | 0 lib/schedulerInstance.js | 25 --- 3 files changed, 239 insertions(+), 201 deletions(-) delete mode 100644 lib/scheduler.js delete mode 100644 lib/schedulerInstance.js diff --git a/index.js b/index.js index a6ec7a5..157376c 100644 --- a/index.js +++ b/index.js @@ -1,199 +1,262 @@ -const express = require("express"); -const cors = require("cors"); -const bodyParser = require("body-parser"); -const config = require("./config"); -const { scheduler, pool } = require("./lib/schedulerInstance"); - -const app = express(); -app.use(cors({ origin: (origin, cb) => cb(null, true), credentials: true })); -app.use(bodyParser.json({ limit: "1mb" })); -app.set("trust proxy", !!config.trust_proxy); - -const PORT = (config.http && config.http.port) || 80; -const MAX_SIM_REQ = typeof config.max_simultaneous_requests === "number" ? config.max_simultaneous_requests : -1; - -function createLimiter(limit) { - let active = 0; - const q = []; - const runNext = () => { - if (limit >= 0 && active >= limit) return; - const next = q.shift(); - if (!next) return; - active++; - next() - .catch(() => {}) - .finally(() => { - active--; - runNext(); - }); - }; - const schedule = (fn) => - new Promise((resolve, reject) => { - const task = async () => { - try { - const r = await fn(); - resolve(r); - } catch (e) { - reject(e); +global._mckay_statistics_opt_out = true; // Opt out of node-steam-user stats + +const optionDefinitions = [ + { name: 'config', alias: 'c', type: String, defaultValue: './config.js' }, // Config file location + { name: 'steam_data', alias: 's', type: String } // Steam data directory +]; + +const winston = require('winston'), + args = require('command-line-args')(optionDefinitions), + bodyParser = require('body-parser'), + rateLimit = require('express-rate-limit'), + utils = require('./lib/utils'), + queue = new (require('./lib/queue'))(), + InspectURL = require('./lib/inspect_url'), + botController = new (require('./lib/bot_controller'))(), + CONFIG = require(args.config), + postgres = new (require('./lib/postgres'))(CONFIG.database_url, CONFIG.enable_bulk_inserts), + gameData = new (require('./lib/game_data'))(CONFIG.game_files_update_interval, CONFIG.enable_game_file_updates), + errors = require('./errors'), + Job = require('./lib/job'); + +if (CONFIG.max_simultaneous_requests === undefined) { + CONFIG.max_simultaneous_requests = 1; +} + +winston.level = CONFIG.logLevel || 'debug'; + +if (CONFIG.logins.length === 0) { + console.log('There are no bot logins. Please add some in config.json'); + process.exit(1); +} + +if (args.steam_data) { + CONFIG.bot_settings.steam_user.dataDirectory = args.steam_data; +} + +for (let [i, loginData] of CONFIG.logins.entries()) { + const settings = Object.assign({}, CONFIG.bot_settings); + if (CONFIG.proxies && CONFIG.proxies.length > 0) { + const proxy = CONFIG.proxies[i % CONFIG.proxies.length]; + + if (proxy.startsWith('http://')) { + settings.steam_user = Object.assign({}, settings.steam_user, {httpProxy: proxy}); + } else if (proxy.startsWith('socks5://')) { + settings.steam_user = Object.assign({}, settings.steam_user, {socksProxy: proxy}); + } else { + console.log(`Invalid proxy '${proxy}' in config, must prefix with http:// or socks5://`); + process.exit(1); } - }; - if (limit < 0 || active < limit) { - active++; - task() - .catch(() => {}) - .finally(() => { - active--; - runNext(); - }); - } else { - q.push(task); - } - }); - return { - schedule, - active: () => active, - queued: () => q.length, - }; + } + + botController.addBot(loginData, settings); } -const limiter = createLimiter(MAX_SIM_REQ); - -let inspectImpl = null; -let inspectImplName = null; - -function resolveInspectImpl() { - if (inspectImpl) return; - const candidates = [ - "./lib/inspect", - "./lib/steam", - "./src/inspect", - "./lib/runInspect", - "./inspector", - ]; - const fnNames = ["inspectWithPair", "inspectLink", "inspect", "run", "performInspect"]; - for (const mod of candidates) { - try { - const m = require(mod); - for (const name of fnNames) { - if (m && typeof m[name] === "function") { - inspectImpl = m[name]; - inspectImplName = `${mod}.${name}`; - return; +postgres.connect(); + +// Setup and configure express +const app = require('express')(); +app.use(function (req, res, next) { + if (req.method === 'POST') { + // Default content-type + req.headers['content-type'] = 'application/json'; + } + next(); +}); +app.use(bodyParser.json({limit: '5mb'})); + +app.use(function (error, req, res, next) { + // Handle bodyParser errors + if (error instanceof SyntaxError) { + errors.BadBody.respond(res); + } + else next(); +}); + + +if (CONFIG.trust_proxy === true) { + app.enable('trust proxy'); +} + +CONFIG.allowed_regex_origins = CONFIG.allowed_regex_origins || []; +CONFIG.allowed_origins = CONFIG.allowed_origins || []; +const allowedRegexOrigins = CONFIG.allowed_regex_origins.map((origin) => new RegExp(origin)); + + +async function handleJob(job) { + // See which items have already been cached + const itemData = await postgres.getItemData(job.getRemainingLinks().map(e => e.link)); + for (let item of itemData) { + const link = job.getLink(item.a); + + if (!item.price && link.price) { + postgres.updateItemPrice(item.a, link.price); } - } - if (typeof m === "function") { - inspectImpl = m; - inspectImplName = `${mod} (default export)`; - return; - } - } catch (_) {} - } + + gameData.addAdditionalItemProperties(item); + item = utils.removeNullValues(item); + + job.setResponse(item.a, item); + } + + if (!botController.hasBotOnline()) { + return job.setResponseRemaining(errors.SteamOffline); + } + + if (CONFIG.max_simultaneous_requests > 0 && + (queue.getUserQueuedAmt(job.ip) + job.remainingSize()) > CONFIG.max_simultaneous_requests) { + return job.setResponseRemaining(errors.MaxRequests); + } + + if (CONFIG.max_queue_size > 0 && (queue.size() + job.remainingSize()) > CONFIG.max_queue_size) { + return job.setResponseRemaining(errors.MaxQueueSize); + } + + if (job.remainingSize() > 0) { + queue.addJob(job, CONFIG.bot_settings.max_attempts); + } } -async function performInspect(pair, url) { - resolveInspectImpl(); - if (!inspectImpl) { - const err = new Error("Inspect implementation not found"); - err.code = "INSPECT_IMPL_MISSING"; - throw err; - } - try { - if (inspectImpl.length >= 3) { - return await inspectImpl(pair.account, pair.proxy, url); - } - if (inspectImpl.length === 2) { - return await inspectImpl(pair, url); - } - if (inspectImpl.length === 1) { - return await inspectImpl(url); - } - return await inspectImpl({ account: pair.account, proxy: pair.proxy, url }); - } catch (e) { - throw e; - } +function canSubmitPrice(key, link, price) { + return CONFIG.price_key && key === CONFIG.price_key && price && link.isMarketLink() && utils.isOnlyDigits(price); } -function normalizeLink(x) { - if (!x) return null; - if (typeof x === "string") return x; - if (x.url) return x.url; - if (x.link) return x.link; - return null; +app.use(function (req, res, next) { + if (CONFIG.allowed_origins.length > 0 && req.get('origin') != undefined) { + // check to see if its a valid domain + const allowed = CONFIG.allowed_origins.indexOf(req.get('origin')) > -1 || + allowedRegexOrigins.findIndex((reg) => reg.test(req.get('origin'))) > -1; + + if (allowed) { + res.header('Access-Control-Allow-Origin', req.get('origin')); + res.header('Access-Control-Allow-Methods', 'GET'); + } + } + next() +}); + +if (CONFIG.rate_limit && CONFIG.rate_limit.enable) { + app.use(rateLimit({ + windowMs: CONFIG.rate_limit.window_ms, + max: CONFIG.rate_limit.max, + headers: false, + handler: function (req, res) { + errors.RateLimit.respond(res); + } + })) } -app.get("/", async (req, res) => { - const url = req.query && req.query.url; - if (!url) return res.status(400).json({ error: "Missing url", code: 2, status: 400 }); - const run = async () => { - const pair = scheduler.take(); - if (!pair) return res.status(503).json({ error: "No bots available", code: 6, status: 503 }); +app.get('/', function(req, res) { + // Get and parse parameters + let link; + + if ('url' in req.query) { + link = new InspectURL(req.query.url); + } + else if ('a' in req.query && 'd' in req.query && ('s' in req.query || 'm' in req.query)) { + link = new InspectURL(req.query); + } + + if (!link || !link.getParams()) { + return errors.InvalidInspect.respond(res); + } + + const job = new Job(req, res, /* bulk */ false); + + let price; + + if (canSubmitPrice(req.query.priceKey, link, req.query.price)) { + price = parseInt(req.query.price); + } + + job.add(link, price); + try { - const data = await performInspect(pair, url); - scheduler.release(pair); - return res.json(data); + handleJob(job); } catch (e) { - scheduler.failWithBackoff(pair, 3000); - const status = e && e.statusCode ? e.statusCode : 500; - return res.status(status).json({ error: e && e.message ? e.message : "Internal error", code: 6, status }); - } - }; - try { - await limiter.schedule(run); - } catch (e) { - return res.status(500).json({ error: "Internal error", code: 6, status: 500 }); - } + winston.warn(e); + errors.GenericBad.respond(res); + } }); -app.post("/bulk", async (req, res) => { - const list = (req.body && req.body.links) || []; - if (!Array.isArray(list) || list.length === 0) return res.json({}); - const outputs = {}; - await Promise.all( - list.map((item) => { - const link = normalizeLink(item); - if (!link) { - return Promise.resolve(); - } - return limiter.schedule(async () => { - const pair = scheduler.take(); - if (!pair) { - outputs[link] = { error: "No bots available", code: 6, status: 503 }; - return; +app.post('/bulk', (req, res) => { + if (!req.body || (CONFIG.bulk_key && req.body.bulk_key != CONFIG.bulk_key)) { + return errors.BadSecret.respond(res); + } + + if (!req.body.links || req.body.links.length === 0) { + return errors.BadBody.respond(res); + } + + if (CONFIG.max_simultaneous_requests > 0 && req.body.links.length > CONFIG.max_simultaneous_requests) { + return errors.MaxRequests.respond(res); + } + + const job = new Job(req, res, /* bulk */ true); + + for (const data of req.body.links) { + const link = new InspectURL(data.link); + if (!link.valid) { + return errors.InvalidInspect.respond(res); } - try { - const data = await performInspect(pair, link); - scheduler.release(pair); - outputs[link] = data; - } catch (e) { - scheduler.failWithBackoff(pair, 3000); - outputs[link] = { error: e && e.message ? e.message : "Internal error", code: 6, status: 500 }; + + let price; + + if (canSubmitPrice(req.body.priceKey, link, data.price)) { + price = parseInt(req.query.price); } - }); - }) - ); - res.json(outputs); + + job.add(link, price); + } + + try { + handleJob(job); + } catch (e) { + winston.warn(e); + errors.GenericBad.respond(res); + } }); -app.get("/stats", (req, res) => { - const free = pool.filter((p) => !p.busy); - const freeLasts = free.map((p) => p.lastReleaseAt).filter(Boolean); - const oldestIdleMs = freeLasts.length ? Math.max(0, Date.now() - Math.min(...freeLasts)) : 0; - const botsOnline = pool.length - pool.filter((p) => p.busy).length; - res.json({ - bots_online: botsOnline, - bots_total: pool.length, - queue_size: limiter.queued(), - queue_concurrency: MAX_SIM_REQ, - scheduler_policy: (config.scheduler && config.scheduler.policy) || "longestIdle", - oldest_idle_ms: oldestIdleMs, - impl: inspectImplName || null, - }); +app.get('/stats', (req, res) => { + res.json({ + bots_online: botController.getReadyAmount(), + bots_total: botController.bots.length, + queue_size: queue.queue.length, + queue_concurrency: queue.concurrency, + }); }); -app.get("/health", (req, res) => { - res.json({ ok: true }); +const http_server = require('http').Server(app); +http_server.listen(CONFIG.http.port); +winston.info('Listening for HTTP on port: ' + CONFIG.http.port); + +queue.process(CONFIG.logins.length, botController, async (job) => { + const itemData = await botController.lookupFloat(job.data.link); + winston.debug(`Received itemData for ${job.data.link.getParams().a}`); + + // Save and remove the delay attribute + let delay = itemData.delay; + delete itemData.delay; + + // add the item info to the DB + await postgres.insertItemData(itemData.iteminfo, job.data.price); + + // Get rank, annotate with game files + itemData.iteminfo = Object.assign(itemData.iteminfo, await postgres.getItemRank(itemData.iteminfo.a)); + gameData.addAdditionalItemProperties(itemData.iteminfo); + + itemData.iteminfo = utils.removeNullValues(itemData.iteminfo); + itemData.iteminfo.stickers = itemData.iteminfo.stickers.map((s) => utils.removeNullValues(s)); + itemData.iteminfo.keychains = itemData.iteminfo.keychains.map((s) => utils.removeNullValues(s)); + + job.data.job.setResponse(job.data.link.getParams().a, itemData.iteminfo); + + return delay; }); -app.listen(PORT, () => { - process.stdout.write(`inspector listening on ${PORT}\n`); +queue.on('job failed', (job, err) => { + const params = job.data.link.getParams(); + winston.warn(`Job Failed! S: ${params.s} A: ${params.a} D: ${params.d} M: ${params.m} IP: ${job.ip}, Err: ${(err || '').toString()}`); + + job.data.job.setResponse(params.a, errors.TTLExceeded); }); diff --git a/lib/scheduler.js b/lib/scheduler.js deleted file mode 100644 index e69de29..0000000 diff --git a/lib/schedulerInstance.js b/lib/schedulerInstance.js deleted file mode 100644 index 5add65b..0000000 --- a/lib/schedulerInstance.js +++ /dev/null @@ -1,25 +0,0 @@ -const config = require("../config"); -const { createLongestIdleScheduler } = require("./scheduler"); - -function proxyFor(i, proxies) { - if (!proxies || proxies.length === 0) return null; - return proxies[i % proxies.length]; -} - -const accounts = Array.isArray(config.logins) ? config.logins : []; -const proxies = Array.isArray(config.proxies) ? config.proxies : []; - -const pool = accounts.map((a, i) => ({ - id: a.user, - account: a, - proxy: proxyFor(i, proxies), - busy: false, - lastReleaseAt: 0, - nextAvailableAt: 0 -})); - -const scheduler = createLongestIdleScheduler(pool, () => Date.now(), { - cooldownMs: (config.scheduler && config.scheduler.cooldownMs) || 1100 -}); - -module.exports = { scheduler, pool }; From 9d4448e6b3307ed07241e01c48ec4029d195e6f9 Mon Sep 17 00:00:00 2001 From: ze4-404 Date: Mon, 13 Oct 2025 00:27:36 +0200 Subject: [PATCH 3/3] #3 Implement initial scheduler logic --- index.js | 38 +++++++++++++++++++++++--------------- lib/bot_controller.js | 31 +++++++++++++++++++++++++++---- lib/pairScheduler.js | 21 +++++++++++++++++++++ 3 files changed, 71 insertions(+), 19 deletions(-) create mode 100644 lib/pairScheduler.js diff --git a/index.js b/index.js index 157376c..5e1e9d7 100644 --- a/index.js +++ b/index.js @@ -17,6 +17,7 @@ const winston = require('winston'), postgres = new (require('./lib/postgres'))(CONFIG.database_url, CONFIG.enable_bulk_inserts), gameData = new (require('./lib/game_data'))(CONFIG.game_files_update_interval, CONFIG.enable_game_file_updates), errors = require('./errors'), + PairScheduler = require('./lib/pair_scheduler'), Job = require('./lib/job'); if (CONFIG.max_simultaneous_requests === undefined) { @@ -34,24 +35,31 @@ if (args.steam_data) { CONFIG.bot_settings.steam_user.dataDirectory = args.steam_data; } +const pairs = []; for (let [i, loginData] of CONFIG.logins.entries()) { - const settings = Object.assign({}, CONFIG.bot_settings); - if (CONFIG.proxies && CONFIG.proxies.length > 0) { - const proxy = CONFIG.proxies[i % CONFIG.proxies.length]; - - if (proxy.startsWith('http://')) { - settings.steam_user = Object.assign({}, settings.steam_user, {httpProxy: proxy}); - } else if (proxy.startsWith('socks5://')) { - settings.steam_user = Object.assign({}, settings.steam_user, {socksProxy: proxy}); - } else { - console.log(`Invalid proxy '${proxy}' in config, must prefix with http:// or socks5://`); - process.exit(1); - } - } - - botController.addBot(loginData, settings); + const settings = Object.assign({}, CONFIG.bot_settings); + let proxy = null; + + if (CONFIG.proxies && CONFIG.proxies.length > 0) { + proxy = CONFIG.proxies[i % CONFIG.proxies.length]; + + if (proxy.startsWith('http://')) { + settings.steam_user = Object.assign({}, settings.steam_user, { httpProxy: proxy }); + } else if (proxy.startsWith('socks5://')) { + settings.steam_user = Object.assign({}, settings.steam_user, { socksProxy: proxy }); + } else { + console.log(`Invalid proxy '${proxy}' in config, must prefix with http:// or socks5://`); + process.exit(1); + } + } + + const bot = botController.addBot(loginData, settings); // RETURN bot + pairs.push({ bot: bot.id, proxy }); // COLLECT pair } +const scheduler = new PairScheduler(pairs); +botController.setScheduler(scheduler); + postgres.connect(); // Setup and configure express diff --git a/lib/bot_controller.js b/lib/bot_controller.js index 0d8a25e..fc1a6d1 100644 --- a/lib/bot_controller.js +++ b/lib/bot_controller.js @@ -8,13 +8,18 @@ class BotController extends EventEmitter { super(); this.readyEvent = false; - this.bots = []; + this.bots = []; + this.byId = new Map(); + this.scheduler = null; } addBot(loginData, settings) { let bot = new Bot(settings); bot.logIn(loginData.user, loginData.pass, loginData.auth); + this.bots.push(bot); + this.byId.set(bot.id, bot); + bot.on('ready', () => { if (!this.readyEvent && this.hasBotOnline()) { this.readyEvent = true; @@ -29,7 +34,7 @@ class BotController extends EventEmitter { } }); - this.bots.push(bot); + return bot; } getFreeBot() { @@ -60,11 +65,29 @@ class BotController extends EventEmitter { } lookupFloat(data) { - let freeBot = this.getFreeBot(); + const pair = this.getFreePair(); + if (pair) return pair.bot.sendFloatRequest(data); + const freeBot = this.getFreeBot(); if (freeBot) return freeBot.sendFloatRequest(data); - else return Promise.reject(errors.NoBotsAvailable); + + return Promise.reject(errors.NoBotsAvailable); } + + setScheduler(scheduler) { + this.scheduler = scheduler; + } + + getFreePair() { + if (!this.scheduler) return null; + const tries = this.bots.length; + for (let i = 0; i < tries; i++) { + const { bot: botId, proxy } = this.scheduler.next(); + const b = this.byId.get(botId); + if (b && b.ready && !b.busy) return { bot: b, proxy }; + } + return null; + } } module.exports = BotController; diff --git a/lib/pairScheduler.js b/lib/pairScheduler.js new file mode 100644 index 0000000..2a05f2a --- /dev/null +++ b/lib/pairScheduler.js @@ -0,0 +1,21 @@ +class PairScheduler{ + constructor(pairs) { + this.clock = 1; + this.items = pairs.map(x => ({ ...x, lastUsed: 0 })); + } + next() { + let min = this.items[0], idx = 0; + for (let i = 1; i < this.items.length; i++) { + if (this.items[i].lastUsed < min.lastUsed) { + min = this.items[i]; + idx = i; + } + } + this.items[idx].lastUsed = this.clock++; + return { bot: this.items[idx].bot, proxy: this.items[idx].proxy }; + } + all() { + return this.items.map(({ bot, proxy }) => ({ bot, proxy })); + } +} +module.exports = PairScheduler; \ No newline at end of file