diff --git a/src/logger.js b/src/logger.js new file mode 100644 index 0000000..0759cb1 --- /dev/null +++ b/src/logger.js @@ -0,0 +1,239 @@ +import fs from 'fs'; +import path from 'path'; +import os from 'os'; + +class TranscodeLogger { + constructor(logFilePath = null, maxLogs = 100) { + // Use NODE_NAME env var for unique log files, fallback to hostname + const nodeName = process.env.NODE_NAME || os.hostname().split('.')[0]; + this.nodeName = nodeName; + this.logFilePath = logFilePath || `logs/transcode-${nodeName}.log`; + this.maxLogs = maxLogs; + this.logs = []; + this.loadLogs(); + console.log(`📝 Logger initialized for node: ${nodeName}`); + } + + loadLogs() { + try { + // Ensure log directory exists + const logDir = path.dirname(this.logFilePath); + if (!fs.existsSync(logDir)) { + fs.mkdirSync(logDir, { recursive: true }); + } + + if (fs.existsSync(this.logFilePath)) { + const data = fs.readFileSync(this.logFilePath, 'utf8'); + this.logs = data.trim().split('\n') + .filter(line => line.trim()) + .map(line => JSON.parse(line)) + .slice(-this.maxLogs); // Keep only the last maxLogs entries + } + } catch (error) { + console.warn('⚠️ Could not load existing logs:', error.message); + this.logs = []; + } + } + + saveLogs() { + try { + // Keep only the last maxLogs entries + this.logs = this.logs.slice(-this.maxLogs); + + const logData = this.logs.map(log => JSON.stringify(log)).join('\n') + '\n'; + fs.writeFileSync(this.logFilePath, logData); + } catch (error) { + console.error('❌ Could not save logs:', error.message); + } + } + + addLog(logEntry) { + const enrichedLog = { + ...logEntry, + timestamp: new Date().toISOString(), + id: logEntry.id || 'unknown', + node: this.nodeName + }; + + this.logs.push(enrichedLog); + this.saveLogs(); + + // Also log to console with rich formatting + this.consoleLog(enrichedLog); + } + + consoleLog(log) { + const emoji = this.getStatusEmoji(log.status); + const duration = log.duration ? `${log.duration}ms` : 'N/A'; + + console.log(`${emoji} [${log.timestamp}] [${log.node || 'unknown'}] ${log.status.toUpperCase()}`); + console.log(` 🆔 ID: ${log.id}`); + console.log(` 👤 User: ${log.user || 'anonymous'}${log.userHP ? ` (HP: ${log.userHP})` : ''}`); + console.log(` 📁 File: ${log.filename || 'unknown'} (${log.fileSize || 0} bytes)`); + console.log(` 📍 IP: ${log.clientIP || 'unknown'}`); + console.log(` 🖥️ Device: ${log.deviceInfo || 'unknown'}`); + console.log(` 🌐 Platform: ${log.platform || 'unknown'}`); + console.log(` ⏱️ Duration: ${duration}`); + + if (log.correlationId) { + console.log(` � Correlation: ${log.correlationId}`); + } + + if (log.viewport) { + console.log(` 📐 Viewport: ${log.viewport}`); + } + + if (log.connectionType) { + console.log(` 📶 Connection: ${log.connectionType}`); + } + + if (log.cid) { + console.log(` 📦 CID: ${log.cid}`); + } + + if (log.error) { + console.log(` ❌ Error: ${log.error}`); + } + + if (log.gatewayUrl) { + console.log(` 🌐 URL: ${log.gatewayUrl}`); + } + + console.log(''); // Empty line for readability + } + + getStatusEmoji(status) { + const emojis = { + 'started': '🚀', + 'processing': '⚙️', + 'uploading': '☁️', + 'completed': '✅', + 'failed': '❌', + 'error': '💥' + }; + return emojis[status] || '📝'; + } + + logTranscodeStart({ id, user, filename, fileSize, clientIP, userAgent, origin, platform, deviceInfo, browserInfo, userHP, correlationId, viewport, connectionType }) { + this.addLog({ + id, + status: 'started', + user: user || 'anonymous', + filename, + fileSize, + clientIP, + userAgent: userAgent?.substring(0, 100), + origin, + platform: platform || 'unknown', + deviceInfo: deviceInfo || 'unknown', + browserInfo: browserInfo || '', + userHP: userHP || 0, + correlationId: correlationId || null, + viewport: viewport || null, + connectionType: connectionType || null, + startTime: Date.now() + }); + } + + logTranscodeComplete({ id, user, filename, cid, gatewayUrl, duration, clientIP }) { + // Find the original start log to preserve device context + const startLog = this.logs.find(log => log.id === id && log.status === 'started'); + + this.addLog({ + id, + status: 'completed', + user: user || 'anonymous', + filename, + cid, + gatewayUrl, + duration, + clientIP, + success: true, + // Preserve device info from start log + platform: startLog?.platform || null, + deviceInfo: startLog?.deviceInfo || null, + browserInfo: startLog?.browserInfo || null, + userHP: startLog?.userHP || null, + correlationId: startLog?.correlationId || null, + viewport: startLog?.viewport || null, + connectionType: startLog?.connectionType || null + }); + } + + logTranscodeError({ id, user, filename, error, duration, clientIP }) { + // Find the original start log to preserve device context + const startLog = this.logs.find(log => log.id === id && log.status === 'started'); + + this.addLog({ + id, + status: 'failed', + user: user || 'anonymous', + filename, + error: error?.message || error || 'Unknown error', + duration, + clientIP, + success: false, + // Preserve device info from start log + platform: startLog?.platform || null, + deviceInfo: startLog?.deviceInfo || null, + browserInfo: startLog?.browserInfo || null, + userHP: startLog?.userHP || null, + correlationId: startLog?.correlationId || null, + viewport: startLog?.viewport || null, + connectionType: startLog?.connectionType || null + }); + } + + logFFmpegProgress({ id, progress, timeElapsed }) { + // Don't save progress logs to file (too noisy), just console log + console.log(`⏳ [FFMPEG-PROGRESS] ID: ${id} | Progress: ${progress} | Elapsed: ${timeElapsed}`); + } + + getRecentLogs(limit = 10) { + return this.logs.slice(-limit).reverse(); // Most recent first + } + + getLogsForDashboard(limit = 5) { + return this.logs.slice(-limit).reverse().map(log => ({ + id: log.id, + timestamp: log.timestamp, + user: log.user, + filename: log.filename, + status: log.status, + duration: log.duration, + error: log.error, + cid: log.cid, + fileSize: log.fileSize, + clientIP: log.clientIP, + platform: log.platform, + deviceInfo: log.deviceInfo, + userHP: log.userHP, + correlationId: log.correlationId, + viewport: log.viewport, + connectionType: log.connectionType, + node: log.node + })); + } + + getStats() { + const total = this.logs.length; + const successful = this.logs.filter(log => log.success === true).length; + const failed = this.logs.filter(log => log.success === false).length; + const inProgress = this.logs.filter(log => log.status === 'started' || log.status === 'processing').length; + + const avgDuration = this.logs + .filter(log => log.duration && log.success === true) + .reduce((sum, log, _, arr) => sum + log.duration / arr.length, 0); + + return { + total, + successful, + failed, + inProgress, + avgDuration: Math.round(avgDuration), + successRate: total > 0 ? Math.round((successful / total) * 100) : 0 + }; + } +} + +export default TranscodeLogger; diff --git a/src/server.js b/src/server.js index 186c9d0..81dcccc 100644 --- a/src/server.js +++ b/src/server.js @@ -9,21 +9,41 @@ import morgan from 'morgan'; import axios from 'axios'; import FormData from 'form-data'; import { v4 as uuidv4 } from 'uuid'; +import TranscodeLogger from './logger.js'; const app = express(); +const logger = new TranscodeLogger(); -// Open CORS (no credentials). Put this BEFORE routes. +// Store active transcoding progress for SSE clients +const activeJobs = new Map(); + +// CORS app.use((req, res, next) => { res.setHeader('Access-Control-Allow-Origin', '*'); - res.setHeader('Access-Control-Allow-Methods', 'GET,POST,OPTIONS'); - res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); + res.setHeader('Access-Control-Allow-Methods', 'GET,POST,OPTIONS,PUT,DELETE'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type,Authorization,Accept,X-Requested-With'); + res.setHeader('Access-Control-Max-Age', '86400'); if (req.method === 'OPTIONS') return res.sendStatus(204); next(); }); -// Keep health probes happy -app.get('/', (_req, res) => res.send('🎬 Video Worker - Ready for transcoding!')); -app.head('/', (_req, res) => res.sendStatus(200)); +// Request logging +app.use((req, res, next) => { + const startTime = Date.now(); + const clientIP = req.ip || req.connection.remoteAddress || req.headers['x-forwarded-for'] || 'unknown'; + const origin = req.get('Origin') || req.get('Referer') || 'direct'; + console.log(`🌐 [${new Date().toISOString()}] ${req.method} ${req.path} - Client: ${clientIP} - Origin: ${origin}`); + if (req.path === '/transcode') { + console.log(`📊 TRANSCODE REQUEST START: IP=${clientIP} Origin=${origin}`); + } + res.on('finish', () => { + const duration = Date.now() - startTime; + if (req.path === '/transcode') { + console.log(`✅ TRANSCODE REQUEST COMPLETE - ${res.statusCode} - ${duration}ms`); + } + }); + next(); +}); const PORT = process.env.PORT || 8080; const PINATA_JWT = process.env.PINATA_JWT; @@ -31,13 +51,64 @@ const PINATA_GATEWAY = process.env.PINATA_GATEWAY || 'https://ipfs.skatehive.app const PINATA_GROUP_VIDEOS = process.env.PINATA_GROUP_VIDEOS || null; if (!PINATA_JWT) { - console.warn('⚠️ PINATA_JWT is not set. Set it in your environment before starting.'); + console.warn('⚠️ PINATA_JWT is not set.'); } app.use(morgan('combined')); + +// Health & info +app.get('/', (_req, res) => res.send('🎬 Video Worker - Ready for transcoding!')); +app.head('/', (_req, res) => res.sendStatus(200)); app.get('/healthz', (_req, res) => res.json({ ok: true, service: 'video-worker', timestamp: new Date().toISOString() })); -// Configure multer to write incoming file to the OS temp dir +// SSE endpoint for real-time progress streaming +app.get('/progress/:requestId', (req, res) => { + const { requestId } = req.params; + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.setHeader('Access-Control-Allow-Origin', '*'); + res.flushHeaders(); + + if (!activeJobs.has(requestId)) { + activeJobs.set(requestId, { progress: 0, stage: 'waiting', clients: new Set() }); + } + + const job = activeJobs.get(requestId); + job.clients.add(res); + res.write(`data: ${JSON.stringify({ progress: job.progress, stage: job.stage })}\n\n`); + + req.on('close', () => { + job.clients.delete(res); + if (job.clients.size === 0 && job.stage === 'complete') { + activeJobs.delete(requestId); + } + }); +}); + +function broadcastProgress(requestId, progress, stage) { + const job = activeJobs.get(requestId); + if (!job) return; + job.progress = progress; + job.stage = stage; + const message = JSON.stringify({ progress, stage }); + for (const client of job.clients) { + client.write(`data: ${message}\n\n`); + } +} + +// Dashboard endpoints +app.get('/logs', (_req, res) => { + const limit = parseInt(_req.query.limit) || 10; + res.json({ logs: logger.getLogsForDashboard(limit), stats: logger.getStats() }); +}); + +app.get('/stats', (_req, res) => { + res.json(logger.getStats()); +}); + +// Multer config const upload = multer({ storage: multer.diskStorage({ destination: (_req, _file, cb) => cb(null, os.tmpdir()), @@ -48,7 +119,7 @@ const upload = multer({ } }); -// Get video duration using ffprobe +// Get video duration via ffprobe function getVideoDuration(inputPath) { return new Promise((resolve) => { const proc = spawn('ffprobe', [ @@ -66,35 +137,100 @@ function getVideoDuration(inputPath) { }); } -function runFfmpeg(args) { +// Parse FFmpeg time to seconds +function timeToSeconds(timeStr) { + const parts = timeStr.split(':'); + return parseFloat(parts[0]) * 3600 + parseFloat(parts[1]) * 60 + parseFloat(parts[2]); +} + +function runFfmpeg(args, requestId = 'unknown', totalDuration = 0) { return new Promise((resolve, reject) => { + const startTime = Date.now(); const proc = spawn('ffmpeg', args, { stdio: ['ignore', 'pipe', 'pipe'] }); let stderr = ''; - proc.stderr.on('data', (d) => { stderr += d.toString(); }); + + proc.stderr.on('data', (d) => { + stderr += d.toString(); + const progressMatch = d.toString().match(/time=(\d{2}:\d{2}:\d{2}\.\d{2})/); + if (progressMatch) { + const timeElapsed = Date.now() - startTime; + const currentTime = timeToSeconds(progressMatch[1]); + let percent = 0; + if (totalDuration > 0) { + percent = Math.min(80, Math.round((currentTime / totalDuration) * 80)); + } + broadcastProgress(requestId, percent, 'transcoding'); + logger.logFFmpegProgress({ id: requestId, progress: progressMatch[1], percent, timeElapsed }); + } + }); + proc.on('close', (code) => { - if (code === 0) resolve({ ok: true }); - else reject(new Error(`ffmpeg exited with ${code}: ${stderr.slice(-4000)}`)); + const duration = Date.now() - startTime; + if (code === 0) { + console.log(`✅ [FFMPEG-SUCCESS] ID: ${requestId} | Duration: ${duration}ms`); + broadcastProgress(requestId, 80, 'uploading'); + resolve({ ok: true }); + } else { + console.error(`❌ [FFMPEG-ERROR] ID: ${requestId} | Code: ${code} | Duration: ${duration}ms`); + broadcastProgress(requestId, 0, 'error'); + reject(new Error(`ffmpeg exited with ${code}: ${stderr.slice(-4000)}`)); + } }); }); } -// POST /transcode (multipart: video [required], creator [optional], thumbnail/thumbnailUrl [optional]) +// POST /transcode app.post('/transcode', upload.single('video'), async (req, res) => { + const internalId = uuidv4().substring(0, 8); + const startTime = Date.now(); + const clientIP = req.ip || req.connection.remoteAddress || 'unknown'; + const userAgent = req.get('User-Agent') || 'unknown'; + const origin = req.get('Origin') || req.get('Referer') || 'direct'; + + const body = req.body || {}; + const creator = (body.creator || body.user || 'anonymous').toString().trim().slice(0, 64); + const sourceApp = body.source_app || body.sourceApp || 'unknown'; + const platform = body.platform || 'unknown'; + + // Use client correlationId for SSE progress (so client can subscribe before request) + const correlationId = body.correlationId || null; + const requestId = correlationId || internalId; + + console.log(`🔗 Request ID for SSE: ${requestId} (correlationId: ${correlationId || 'none'})`); + + logger.logTranscodeStart({ + id: requestId, + user: creator, + sourceApp, + filename: req.file?.originalname || 'unknown', + fileSize: req.file?.size || 0, + clientIP, + userAgent, + origin, + platform, + correlationId + }); + if (!req.file) { return res.status(400).json({ error: 'No file uploaded. Send multipart/form-data with field "video".' }); } - const requestId = uuidv4().substring(0, 8); - const startTime = Date.now(); const inputPath = req.file.path; const outName = `${uuidv4()}.mp4`; const outputPath = path.join(os.tmpdir(), outName); + // Initialize SSE job tracking - preserve existing clients if SSE connected first + const existingJob = activeJobs.get(requestId); + const clients = existingJob?.clients || new Set(); + activeJobs.set(requestId, { progress: 0, stage: 'starting', clients }); + console.log(`📡 SSE clients for ${requestId}: ${clients.size}`); + broadcastProgress(requestId, 5, 'receiving'); + try { - // Get video duration for metadata const videoDuration = await getVideoDuration(inputPath); + console.log(`📏 Video duration: ${videoDuration}s`); + broadcastProgress(requestId, 10, 'transcoding'); - // Transcode to H.264/AAC MP4 const ffArgs = [ '-y', '-i', inputPath, @@ -106,25 +242,18 @@ app.post('/transcode', upload.single('video'), async (req, res) => { '-movflags', '+faststart', outputPath ]; - await runFfmpeg(ffArgs); - // Upload to Pinata - if (!PINATA_JWT) { - throw new Error('PINATA_JWT not configured on server'); - } + await runFfmpeg(ffArgs, requestId, videoDuration); + + if (!PINATA_JWT) throw new Error('PINATA_JWT not configured on server'); - // Read optional form fields - const body = req.body || {}; - const creator = (body.creator || 'anonymous').toString().trim().slice(0, 64); - const sourceApp = body.source_app || body.sourceApp || 'unknown'; - const platform = body.platform || 'unknown'; const thumbnailRaw = (body.thumbnail ?? body.thumbnailUrl ?? '').toString().trim(); const thumbnail = thumbnailRaw ? thumbnailRaw.slice(0, 2048) : ''; const form = new FormData(); form.append('file', fs.createReadStream(outputPath), { filename: outName, contentType: 'video/mp4' }); - // Pinata metadata - standardized schema (max 10 keyvalues) + // Pinata metadata - max 10 keyvalues const uploadDate = new Date().toISOString(); const metadata = { name: `${creator}-${uploadDate}.mp4`, @@ -143,23 +272,27 @@ app.post('/transcode', upload.single('video'), async (req, res) => { }; form.append('pinataMetadata', JSON.stringify(metadata)); - // Pinata options - Groups support for organized uploads const options = { cidVersion: 1, ...(PINATA_GROUP_VIDEOS && { groupId: PINATA_GROUP_VIDEOS }) }; form.append('pinataOptions', JSON.stringify(options)); + broadcastProgress(requestId, 85, 'uploading'); + const resp = await axios.post( 'https://api.pinata.cloud/pinning/pinFileToIPFS', form, { - headers: { - ...form.getHeaders(), - Authorization: `Bearer ${PINATA_JWT}` - }, + headers: { ...form.getHeaders(), Authorization: `Bearer ${PINATA_JWT}` }, maxContentLength: Infinity, maxBodyLength: Infinity, + onUploadProgress: (progressEvent) => { + if (progressEvent.total) { + const uploadPercent = Math.round((progressEvent.loaded / progressEvent.total) * 15); + broadcastProgress(requestId, 85 + uploadPercent, 'uploading'); + } + } } ); @@ -167,34 +300,45 @@ app.post('/transcode', upload.single('video'), async (req, res) => { const gatewayUrl = `${PINATA_GATEWAY.replace(/\/+$/, '')}/${cid}`; const totalDuration = Date.now() - startTime; - console.log(`✅ Transcoded & uploaded: ${req.file.originalname} → ${cid} (${totalDuration}ms)`); + broadcastProgress(requestId, 100, 'complete'); - res.status(200).json({ + logger.logTranscodeComplete({ + id: requestId, + user: creator, + filename: req.file.originalname, cid, gatewayUrl, - requestId, duration: totalDuration, - creator, - sourceApp, - timestamp: uploadDate + clientIP }); + res.status(200).json({ cid, gatewayUrl, requestId, duration: totalDuration, creator, sourceApp, timestamp: uploadDate }); + } catch (err) { const totalDuration = Date.now() - startTime; - console.error(`❌ Transcode failed: ${err.message}`, { - requestId, - file: req.file?.originalname, - duration: totalDuration, - pinataError: err.response?.data + broadcastProgress(requestId, 0, 'error'); + + console.error('❌ [ERROR] Full details:', { + message: err.message, + response: err.response?.data, + status: err.response?.status }); - res.status(500).json({ - error: err.message || 'Transcode failed', - requestId, - duration: totalDuration + + logger.logTranscodeError({ + id: requestId, + user: creator, + filename: req.file?.originalname || 'unknown', + error: err.message || err, + duration: totalDuration, + clientIP }); + + res.status(500).json({ error: err.message || 'Transcode failed', requestId, duration: totalDuration, timestamp: new Date().toISOString() }); + } finally { try { fs.unlinkSync(inputPath); } catch {} try { fs.unlinkSync(outputPath); } catch {} + setTimeout(() => { activeJobs.delete(requestId); }, 5000); } }); @@ -202,6 +346,9 @@ app.listen(PORT, () => { console.log(`🎬 Video worker listening on :${PORT}`); console.log(`🔗 Health check: http://localhost:${PORT}/healthz`); console.log(`🎯 Transcode endpoint: http://localhost:${PORT}/transcode`); + console.log(`🌊 Progress SSE: http://localhost:${PORT}/progress/:requestId`); + console.log(`📊 Logs: http://localhost:${PORT}/logs`); + console.log(`📈 Stats: http://localhost:${PORT}/stats`); console.log(`🌐 Gateway: ${PINATA_GATEWAY}`); if (PINATA_GROUP_VIDEOS) console.log(`📁 Pinata Group: ${PINATA_GROUP_VIDEOS}`); });