Skip to content

Commit

Permalink
refactor(): More demux progress
Browse files Browse the repository at this point in the history
  • Loading branch information
jwerle committed Jan 13, 2020
1 parent 69e9cf7 commit 4d1f4ef
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 22 deletions.
114 changes: 107 additions & 7 deletions demux.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,44 @@
const { Source } = require('./source')
const { ffmpeg } = require('./ffmpeg')
const { Track } = require('./track')
const through = require('through2')
const assert = require('assert')
const Batch = require('batch')
const debug = require('debug')('little-media-box:demux')
const once = require('once')
const path = require('path')

/**
* The default file extension name used for demux outputs
* @public
* @const
*/
const DEFAULT_DEMUX_FILE_EXTENSION_NAME = '.mkv'

/**
* Creates and returns an array of output options
* for the source demuxer.
* @private
* @param {?(Array)} extras
* @return {Array}
*/
function createDemuxOutputOptions(extras) {
const options = ['-c', 'copy', '-f', 'matroska']
return Array.from(new Set(options.concat(extras || [])))
}

/**
* Demux source streams into own outputs. This function writes demuxed streams
* to the file system and provides them as `Source` instances to the caller in
* the `calllback()` function upon success.
* @public
* @param {Source|Track|String|Object} source
* @param {?(Object)} opts
* @param {?(String)} opts.cwd
* @param {?(String)} opts.extname
* @param {?(Number)} opts.streamIndex
* @param {?(Array)} opts.demuxOptions
* @param {?(Array<Number>)} opts.streamIndices
* @param {Function<Error, Array<Source>>} callback
*/
function demux(source, opts, callback) {
Expand All @@ -31,10 +59,17 @@ function demux(source, opts, callback) {
const inputs = []
const outputs = new Set()
const pending = []
const streams = Array.isArray(opts.streams) ? opts.streams : []
const streamIndices = Array.isArray(opts.streamIndices)
? opts.streamIndices
: []

const stream = source.createReadStream()
const demuxer = ffmpeg(stream)
const demuxOptions = createDemuxOutputOptions(opts.demuxOptions)

if (0 === streamIndices.length && 'number' === typeof opts.streamIndex) {
streamIndices.push(opts.streamIndex)
}

source.ready(onready)

Expand All @@ -49,26 +84,25 @@ function demux(source, opts, callback) {
function onprobe(err, info) {
if (err) { return callback(err) }

if (streams.length > 0) {
for (const index of streams) {
if (streamIndices.length > 0) {
for (const index of streamIndices) {
inputs.push(info.streams[index])
}
} else {
inputs.push(...info.streams)
}

const { cwd = source.cwd } = opts
const { demuxOptions } = source

for (const input of inputs) {
const { index, codec_type, tags } = input
const outputOptions = demuxOptions.concat([`-map 0:${index}`])
const language = tags.language || ''
const extname = opts.extname || '.mkv'
const language = tags && tags.language || ''
const extname = opts.extname || DEFAULT_DEMUX_FILE_EXTENSION_NAME
const output = `${index}_${codec_type}_${language}${extname}`

debug('demux(): output:', output)
debug('demux(): options:', demuxOptions)
debug('demux(): options:', outputOptions)

demuxer.output(output)
demuxer.outputOptions(outputOptions)
Expand Down Expand Up @@ -108,9 +142,75 @@ function demux(source, opts, callback) {
}
}

/**
* Creates a demux stream from a source. The stream can be
* selected by specifying the stream index with `opts.streamIndex`.
* The default stream index is `0` if it is not given.
* @public
* @param {Source|Track|String|Object} source
* @param {?(Object)} opts
* @param {?(String)} opts.cwd
* @param {?(Number)} opts.streamIndex
* @param {?(Array)} opts.demuxOptions
* @return {Stream}
*/
function createDemuxStream(source, opts) {
if (!opts || 'object' !== typeof opts) {
opts = {}
}

// coerce into `Source` instance and get own copy
source = Source.from(source)

const { streamIndex = 0 } = opts
const sourceStream = source.createReadStream()
const demuxStream = through()
const demuxer = ffmpeg(sourceStream)

const demuxOptions = createDemuxOutputOptions(opts.demuxOptions)
const outputOptions = demuxOptions.concat([`-map 0:${streamIndex}`])

source.ready(onready)

demuxer.outputOptions(outputOptions)
demuxer.output(demuxStream)
demuxer.on('error', onerror)
demuxer.on('end', onend)

debug('createDemuxStream(): output options:', outputOptions)

return Object.assign(demuxStream, { demuxer })

function onready(err) {
if (err) { return demuxStream.emit('error', err) }

source.active()
demuxer.run()
}

function onerror(err) {
source.inactive()
demuxStream.emit('error', err)
}

function onend(err, stdout, stderr) {
debug('demux(): ffmpeg:', stdout)
debug('demux(): ffmpeg:', stderr)

if (err) {
return onerror(err)
}

source.inactive()
}
}

/**
* Module exports.
*/
module.exports = {
DEFAULT_DEMUX_FILE_EXTENSION_NAME,

createDemuxStream,
demux
}
39 changes: 24 additions & 15 deletions source.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ const uuid = require('uuid/v4')
const url = require('url')
const fs = require('fs')

/**
* Creates and returns an array of output options
* for the source demuxer.
* @private
* @param {?(Array)} extras
* @return {Array}
*/
function createDemuxOutputOptions(extras) {
const options = ['-c', 'copy', '-f', 'matroska']
return Array.from(new Set(options.concat(extras || [])))
}

/**
* The `Source` class represents a container for a HTTP
* resource or local file that can be consumed as a readable
Expand All @@ -48,6 +36,7 @@ class Source extends Resource {
*/
static from (uri, opts) {
let source = null
let stream = null

if (!opts || 'object' !== typeof opts) {
opts = {}
Expand All @@ -64,6 +53,14 @@ class Source extends Resource {
source = uri.source
}

// possibly a `Stream` instance or something like that
// looks like one (quack)
if (uri && uri.on && uri.push && uri.pipe) {
stream = uri
source = stream.source || null
uri = stream.uri || null
}

// create a new `Source` from existing instance copying
// properties over allowing input `opts` to take precedence
if (source) {
Expand All @@ -72,7 +69,8 @@ class Source extends Resource {
cwd: opts.cwd || source.cwd,
duration: opts.duration || source.duration,
byteLength: opts.byteLength || source.byteLength,
demuxOptions: opts.demuxOptions || source.demuxOptions
demuxOptions: opts.demuxOptions || source.demuxOptions,
stream,
})
}

Expand All @@ -85,9 +83,9 @@ class Source extends Resource {
* @param {?(Object)} opts
* @param {?(String)} opts.id
* @param {?(String)} opts.cwd
* @param {?(Stream)} opts.stream
* @param {?(Number)} opts.duration
* @param {?(Number)} opts.byteLength
* @param {?(Array)} opts.demuxOptions
*/
constructor(uri, opts) {
super()
Expand All @@ -99,6 +97,7 @@ class Source extends Resource {
this.id = opts.id || uuid()
this.uri = uri
this.cwd = opts.cwd || process.cwd()
this.stream = opts.stream || null
this.duration = opts.duration || 0
this.byteLength = opts.byteLength || 0
this.demuxOptions = createDemuxOutputOptions(opts.demuxOptions)
Expand All @@ -115,6 +114,15 @@ class Source extends Resource {
return process.nextTick(callback, null)
}

// try attached stream
if (this.stream) {
ffmpeg(this.stream).ffprobe((err, info) => {
if (err) { return callback() }
this.byteLength = parseInt(info.format.size)
callback(null)
})
}

const uri = url.parse(this.uri)
if (/https?:/.test(uri.protocol)) {
head(this.uri, (err, res) => {
Expand Down Expand Up @@ -169,7 +177,8 @@ class Source extends Resource {

getUri(uri, onstream)

return readStream
// set `source` so `Source.from(readStream)` return a valid source with state intact
return Object.assign(readStream, { source: this })

function onstream(err, sourceStream) {
if (err) { return readStream.emit('error', err) }
Expand Down

0 comments on commit 4d1f4ef

Please sign in to comment.