diff --git a/inc/streams/DataStream.h b/inc/streams/DataStream.h
index dc69e5f6..012b6fea 100644
--- a/inc/streams/DataStream.h
+++ b/inc/streams/DataStream.h
@@ -47,6 +47,11 @@ DEALINGS IN THE SOFTWARE.
#define DATASTREAM_SAMPLE_RATE_UNKNOWN 0.0f
+#define DATASTREAM_DONT_CARE 0
+#define DATASTREAM_NOT_WANTED 1
+#define DATASTREAM_WANTED 2
+
+
namespace codal
{
/**
@@ -55,7 +60,6 @@ namespace codal
class DataSink
{
public:
-
virtual int pullRequest();
};
@@ -64,6 +68,8 @@ namespace codal
*/
class DataSource
{
+ int dataIsWanted;
+
public:
virtual ManagedBuffer pull();
virtual void connect(DataSink &sink);
@@ -72,7 +78,39 @@ namespace codal
virtual int getFormat();
virtual int setFormat(int format);
virtual float getSampleRate();
- virtual float requestSampleRate(float sampleRate);
+ virtual void dataWanted(int wanted);
+ virtual int isWanted();
+ };
+
+ /**
+ * This class acts as a base class for objects that serve both the DataSource and DataSink interfaces.
+ * Classes anre not required to use this, but are strongly encouraged to use this as a base class, in order
+ * to reduce complexity, ensure consistent behaviour, and reduce duplicated code.
+ */
+ class DataSourceSink : public DataSource, public DataSink
+ {
+
+ public:
+ DataSink *downStream;
+ DataSource &upStream;
+
+ /**
+ * Constructor.
+ * Creates an empty DataSourceSink.
+ *
+ * @param upstream the component that will normally feed this datastream with data.
+ */
+ DataSourceSink(DataSource &upstream);
+ virtual ~DataSourceSink();
+
+ virtual void connect(DataSink &sink);
+ virtual bool isConnected();
+ virtual void disconnect();
+ virtual int getFormat();
+ virtual int setFormat(int format);
+ virtual float getSampleRate();
+ virtual void dataWanted(int wanted);
+ virtual int pullRequest();
};
/**
@@ -80,19 +118,14 @@ namespace codal
* A Datastream holds a number of ManagedBuffer references, provides basic flow control through a push/pull mechanism
* and byte level access to the datastream, even if it spans different buffers.
*/
- class DataStream : public DataSource, public DataSink
+ class DataStream : public DataSourceSink
{
uint16_t pullRequestEventCode;
- uint16_t flowEventCode;
ManagedBuffer nextBuffer;
bool hasPending;
bool isBlocking;
- unsigned int missedBuffers;
int downstreamReturn;
- DataSink *downStream;
- DataSource *upStream;
-
public:
/**
@@ -109,58 +142,12 @@ namespace codal
*/
~DataStream();
- /**
- * Controls if this component should emit flow state events.
- *
- * @warning Should not be called mutliple times with `id == 0`, as it will spuriously reallocate event IDs
- *
- * @param id If zero, this will auto-allocate a new event ID
- * @return uint16_t The new event ID for this DataStream
- */
- uint16_t emitFlowEvents( uint16_t id = 0 );
-
/**
* Determines if any of the data currently flowing through this stream is held in non-volatile (FLASH) memory.
* @return true if one or more of the ManagedBuffers in this stream reside in FLASH memory, false otherwise.
*/
bool isReadOnly();
- /**
- * Attempts to determine if another component downstream of this one is _actually_ pulling data, and thus, data
- * is flowing.
- *
- * @return true If there is a count-match between `pullRequest` and `pull` calls.
- * @return false If `pullRequest` calls are not currently being matched by `pull` calls.
- */
- bool isFlowing();
-
- /**
- * Define a downstream component for data stream.
- *
- * @sink The component that data will be delivered to, when it is available
- */
- virtual void connect(DataSink &sink) override;
-
- /**
- * Determines if this source is connected to a downstream component
- *
- * @return true If a downstream is connected
- * @return false If a downstream is not connected
- */
- virtual bool isConnected();
-
- /**
- * Define a downstream component for data stream.
- *
- * @sink The component that data will be delivered to, when it is available
- */
- virtual void disconnect() override;
-
- /**
- * Determine the data format of the buffers streamed out of this component.
- */
- virtual int getFormat() override;
-
/**
* Determines if this stream acts in a synchronous, blocking mode or asynchronous mode. In blocking mode, writes to a full buffer
* will result int he calling fiber being blocked until space is available. Downstream DataSinks will also attempt to process data
@@ -187,29 +174,7 @@ namespace codal
*/
virtual int pullRequest();
- /**
- * Query the stream for its current sample rate.
- *
- * If the current object is unable to determine this itself, it will pass the call upstream until it reaches a component can respond.
- *
- * @warning The sample rate for a stream may change during its lifetime. If a component is sensitive to this, it should periodically check.
- *
- * @return float The current sample rate for this stream, or `DATASTREAM_SAMPLE_RATE_UNKNOWN` if none is found.
- */
- virtual float getSampleRate() override;
-
- /**
- * Request a new sample rate on this stream.
- *
- * Most components will simply forward this call upstream, and upon reaching a data source, if possible the source should change
- * the sample rate to accomodate the request.
- *
- * @warning Not all sample rates will be possible for all devices, so if the caller needs to know the _actual_ rate, they should check the returned value here
- *
- * @param sampleRate The requested sample rate, to be handled by the nearest component capable of doing so.
- * @return float The actual sample rate this stream will now run at, may differ from the requested sample rate.
- */
- virtual float requestSampleRate(float sampleRate) override;
+ virtual void connect(DataSink &sink);
private:
/**
diff --git a/inc/streams/EffectFilter.h b/inc/streams/EffectFilter.h
index f2de3834..29a5ae82 100644
--- a/inc/streams/EffectFilter.h
+++ b/inc/streams/EffectFilter.h
@@ -7,12 +7,8 @@
namespace codal
{
- class EffectFilter : public DataSource, public DataSink
+ class EffectFilter : public DataSourceSink
{
- protected:
-
- DataSink *downStream;
- DataSource &upStream;
bool deepCopy;
public:
@@ -21,15 +17,6 @@ namespace codal
~EffectFilter();
virtual ManagedBuffer pull();
- virtual int pullRequest();
- virtual void connect( DataSink &sink );
- bool isConnected();
- virtual void disconnect();
- virtual int getFormat();
- virtual int setFormat( int format );
-
- virtual float getSampleRate();
- virtual float requestSampleRate(float sampleRate);
/**
* Defines if this filter should perform a deep copy of incoming data, or update data in place.
diff --git a/inc/streams/FIFOStream.h b/inc/streams/FIFOStream.h
deleted file mode 100644
index a16503b2..00000000
--- a/inc/streams/FIFOStream.h
+++ /dev/null
@@ -1,52 +0,0 @@
-#ifndef FIFO_STREAM_H
-#define FIFO_STREAM_H
-
-#include "ManagedBuffer.h"
-#include "DataStream.h"
-
-
-#define FIFO_MAXIMUM_BUFFERS 256
-
-namespace codal {
-
- class FIFOStream : public DataSource, public DataSink
- {
- private:
-
- ManagedBuffer buffer[FIFO_MAXIMUM_BUFFERS];
- int bufferCount;
- int bufferLength;
-
- bool allowInput;
- bool allowOutput;
-
- DataSink *downStream;
- DataSource &upStream;
-
- public:
-
- FIFOStream( DataSource &source );
- ~FIFOStream();
-
- virtual ManagedBuffer pull();
- virtual int pullRequest();
- virtual void connect( DataSink &sink );
- bool isConnected();
- virtual void disconnect();
- virtual int getFormat();
- virtual int setFormat( int format );
- int length();
- void dumpState();
-
- bool canPull();
- bool isFull();
-
- void setInputEnable( bool state );
- void setOutputEnable( bool state );
-
-
- };
-
-}
-
-#endif
\ No newline at end of file
diff --git a/inc/streams/LevelDetectorSPL.h b/inc/streams/LevelDetectorSPL.h
index f016cfcc..7bc9445c 100644
--- a/inc/streams/LevelDetectorSPL.h
+++ b/inc/streams/LevelDetectorSPL.h
@@ -36,6 +36,7 @@ DEALINGS IN THE SOFTWARE.
#define LEVEL_DETECTOR_SPL_HIGH_THRESHOLD_PASSED 0x02
#define LEVEL_DETECTOR_SPL_LOW_THRESHOLD_PASSED 0x04
#define LEVEL_DETECTOR_SPL_CLAP 0x08
+#define LEVEL_DETECTOR_SPL_DATA_REQUESTED 0x10
/**
@@ -82,6 +83,7 @@ DEALINGS IN THE SOFTWARE.
#define LEVEL_DETECTOR_SPL_CLAP_MIN_LOUD_BLOCKS 2 // ensure noise not too short to be a clap
#define LEVEL_DETECTOR_SPL_CLAP_MIN_QUIET_BLOCKS 20 // prevent very fast taps being registered as clap
+#define LEVEL_DETECTOR_SPL_TIMEOUT 50 // Time in ms at which we request no further data.
namespace codal{
class LevelDetectorSPL : public CodalComponent, public DataSink
@@ -97,18 +99,18 @@ namespace codal{
int sigma; // Running total of the samples in the current window.
float gain;
float minValue;
- bool activated; // Has this component been connected yet
- bool enabled; // Is the component currently running
- int unit; // The units to be returned from this level detector (e.g. dB or linear 8bit)
- int quietBlockCount; // number of quiet blocks consecutively - used for clap detection
- int noisyBlockCount; // number of noisy blocks consecutively - used for clap detection
- bool inNoisyBlock; // if had noisy and waiting to lower beyond lower threshold
- float maxRms; // maximum rms within a noisy block
+ bool enabled; // Is the component currently running.
+ int unit; // The units to be returned from this level detector (e.g. dB or linear 8bit).
+ int quietBlockCount; // number of quiet blocks consecutively - used for clap detection.
+ int noisyBlockCount; // number of noisy blocks consecutively - used for clap detection.
+ bool inNoisyBlock; // if had noisy and waiting to lower beyond lower threshold.
+ float maxRms; // maximum rms within a noisy block.
private:
- uint64_t timeout; // The timestamp at which this component will cease actively sampling the data stream
- uint8_t bufferCount; // Used to track that enough buffers have been seen since activation to output a valid value/event
- FiberLock resourceLock;
+ uint8_t bufferCount; // Used to track that enough buffers have been seen since activation to output a valid value/event.
+ uint8_t listenerCount; // The total number of active listeners to this component.
+ FiberLock resourceLock; // Fiberlock - used purely hold fibers requesting data before it is available.
+ uint64_t timestamp; // Timestamp of the last time someone requesed data from this component.
public:
/**
@@ -122,9 +124,13 @@ namespace codal{
*/
LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain,
float minValue = 52,
- uint16_t id = DEVICE_ID_SYSTEM_LEVEL_DETECTOR_SPL,
- bool activateImmediately = true);
+ uint16_t id = DEVICE_ID_SYSTEM_LEVEL_DETECTOR_SPL);
+ /**
+ * Periodic callback, every 6ms or so.
+ */
+ void periodicCallback();
+
/**
* Callback provided when data is ready.
*/
@@ -139,11 +145,10 @@ namespace codal{
float getValue( int scale = -1 );
/**
- * Keep this component active and processing buffers so that events can be produced
- *
- * @param state If set to true, this component will connect (if required) and start consuming buffers
+ * Callback when a listener to this component is added.
+ * n.b. we currently don't support removing listners (future work if necessary)
*/
- void activateForEvents( bool state );
+ void listenerAdded();
/**
* Disable component
diff --git a/inc/streams/StreamFlowTrigger.h b/inc/streams/StreamFlowTrigger.h
deleted file mode 100644
index f13c81f3..00000000
--- a/inc/streams/StreamFlowTrigger.h
+++ /dev/null
@@ -1,37 +0,0 @@
-#include "ManagedBuffer.h"
-#include "DataStream.h"
-
-#ifndef STREAM_FLOW_TRIGGER_H
-#define STREAM_FLOW_TRIGGER_H
-
-#define TRIGGER_PULL 1
-#define TRIGGER_REQUEST 2
-
-namespace codal {
-
- class StreamFlowTrigger : public DataSource, public DataSink {
- private:
-
- DataSink *downStream;
- DataSource &upStream;
-
- void (*eventHandler)(int);
-
- public:
-
- StreamFlowTrigger( DataSource &source );
- ~StreamFlowTrigger();
-
- void setDataHandler( void (*handler)(int) );
-
- virtual ManagedBuffer pull();
- virtual int pullRequest();
- virtual void connect( DataSink &sink );
- bool isConnected();
- virtual void disconnect();
- virtual int getFormat();
- virtual int setFormat( int format );
- };
-}
-
-#endif
\ No newline at end of file
diff --git a/inc/streams/StreamNormalizer.h b/inc/streams/StreamNormalizer.h
index 00d353a6..b6c51c36 100644
--- a/inc/streams/StreamNormalizer.h
+++ b/inc/streams/StreamNormalizer.h
@@ -41,7 +41,7 @@ typedef void (*SampleWriteFn)(uint8_t *, int);
namespace codal{
- class StreamNormalizer : public DataSink, public DataSource
+ class StreamNormalizer : public DataSourceSink
{
public:
int outputFormat; // The format to output in. By default, this is the sme as the input.
@@ -52,9 +52,7 @@ namespace codal{
bool normalize; // If set, will recalculate a zero offset.
bool zeroOffsetValid; // Set to true after the first buffer has been processed.
bool outputEnabled; // When set any bxuffer processed will be forwarded downstream.
- DataSource &upstream; // The upstream component of this StreamNormalizer.
DataStream output; // The downstream output stream of this StreamNormalizer.
- //ManagedBuffer buffer; // The buffer being processed.
static SampleReadFn readSample[9];
static SampleWriteFn writeSample[9];
@@ -94,11 +92,6 @@ namespace codal{
*/
bool getNormalize();
- /**
- * Determine the data format of the buffers streamed out of this component.
- */
- virtual int getFormat();
-
/**
* Defines the data format of the buffers streamed out of this component.
* @param format valid values include:
@@ -113,7 +106,7 @@ namespace codal{
* DATASTREAM_FORMAT_32BIT_SIGNED
*/
virtual int setFormat(int format);
-
+ virtual int getFormat();
/**
* Defines an optional gain to apply to the input, as a floating point multiple.
*
@@ -137,18 +130,6 @@ namespace codal{
*/
int setOrMask(uint32_t mask);
- float getSampleRate();
-
- float requestSampleRate(float sampleRate);
-
- /**
- * Determines if this source is connected to a downstream component
- *
- * @return true If a downstream is connected
- * @return false If a downstream is not connected
- */
- bool isConnected();
-
/**
* Destructor.
*/
diff --git a/inc/streams/StreamRecording.h b/inc/streams/StreamRecording.h
index 3e1b390f..ac58d482 100644
--- a/inc/streams/StreamRecording.h
+++ b/inc/streams/StreamRecording.h
@@ -6,76 +6,45 @@
// Pretty much the largest sensible number we can have on a Micro:bit v2
#ifndef CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH
- #define CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH 50000 // 50k, in bytes
+ #define CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH 51200
#endif
+#ifndef CODAL_STREAM_RECORDING_BUFFER_SIZE
+ #define CODAL_STREAM_RECORDING_BUFFER_SIZE 256
+#endif
+
+#define CODAL_STREAM_RECORDING_SIZE (CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH / CODAL_STREAM_RECORDING_BUFFER_SIZE)
+
#define REC_STATE_STOPPED 0
#define REC_STATE_PLAYING 1
#define REC_STATE_RECORDING 2
namespace codal
{
-
- class StreamRecording_Buffer {
- public:
- ManagedBuffer buffer;
- StreamRecording_Buffer * next;
-
- StreamRecording_Buffer( ManagedBuffer data ) {
- this->buffer = data;
- this->next = NULL;
- }
- };
-
- class StreamRecording : public DataSource, public DataSink
+ class StreamRecording : public DataSourceSink
{
private:
-
- //ManagedBuffer buffer[REC_MAX_BUFFERS];
- //StreamRecording_Buffer_t * bufferChain;
- StreamRecording_Buffer * lastBuffer;
- StreamRecording_Buffer * readHead;
- uint32_t maxBufferLenth;
- uint32_t totalBufferLength;
- uint32_t totalMemoryUsage;
- int state;
- float lastUpstreamRate;
-
- DataSink *downStream;
- DataSource &upStream;
-
- void initialise();
+ uint32_t totalBufferLength; // Amount of data currently stored in this object.
+ int state; // STOPPED/PLAYING/RECORDING.
+ ManagedBuffer data[CODAL_STREAM_RECORDING_SIZE]; // Buffers of data, each CODAL_STREAM_RECORDING_BUFFER_SIZE in length.
+ int readOffset; // Index into the buffer, indicating the current point for playback.
+ int writeOffset; // Index into the buffer, indicating the current point for recording.
+ FiberLock recordLock, playLock; // Indicates to synchronous recording threads when recording/playback is complete.
public:
-
- StreamRecording_Buffer * bufferChain;
-
/**
* @brief Construct a new Stream Recording object
*
* @param source An upstream DataSource to connect to
* @param length The maximum amount of memory (RAM) in bytes to allow this recording object to use. Defaults to CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH.
*/
- StreamRecording( DataSource &source, uint32_t length = CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH );
-
- /**
- * @brief Destroy the Stream Recording object
- */
- ~StreamRecording();
+ StreamRecording( DataSource &source);
virtual ManagedBuffer pull();
virtual int pullRequest();
- virtual void connect( DataSink &sink );
- bool isConnected();
- virtual void disconnect();
- virtual int getFormat();
- virtual int setFormat( int format );
-
- void printChain();
/**
* @brief Calculate and return the length in bytes that this StreamRecording represents
- *
* @return int The length, in bytes.
*/
int length();
@@ -86,45 +55,27 @@ namespace codal
* As this cannot be known by this class (as the sample rate may change during playback or recording) the expected rate must be supplied.
*
* @param sampleRate The sample rate to calculate the duration for, in samples per second.
- * @return long The total duration of this StreamRecording, based on the supplied sample rate, in seconds.
+ * @return The total duration of this StreamRecording, based on the supplied sample rate, in seconds.
*/
float duration( unsigned int sampleRate );
- /**
- * @brief Downstream classes should use this to determing if there is data to pull from this StreamRecording object.
- *
- * @return true If data is available
- * @return false If the object is completely empty
- */
- bool canPull();
-
- /**
- * @brief Checks if this object can store any further ManagedBuffers from the upstream components.
- *
- * @note This does not mean that RAM is completely full, but simply that there is now more internal storage for ManagedBuffer references.
- *
- * @return true If there are no more slots available to track more ManagedBuffers.
- * @return false If there is remaining internal storage capacity for more data
- */
- bool isFull();
-
/**
* @brief Begin recording data from the connected upstream
*
- * The StreamRecording object will, if already playing; stop playback, erase its buffer, and start recording.
+ * The StreamRecording object is not already recording, it will stop any existing playback, erase its buffer, and start recording.
*
* Non-blocking, will return immediately.
*
- * @return Returns true if the object state actually changed (ie. we weren't already recording)
+ * @return Returns DEVICE_OK on completion.
*/
- bool recordAsync();
+ int recordAsync();
/**
* @brief Begin recording data from the connected upstream
*
- * The StreamRecording object will, if already playing; stop playback, erase its buffer, and start recording.
+ * The StreamRecording object is not already recording, it will stop any existing playback, erase its buffer, and start recording.
*
- * Blocking call, will repeatedly deschedule the current fiber until the recording completes.
+ * Blocking call, will deschedule the current fiber until the recording completes.
*/
void record();
@@ -135,16 +86,16 @@ namespace codal
*
* Non-blocking, will return immediately.
*
- * @return Returns true if the object state actually changed (ie. we weren't already recording)
+ * @return Returns DEVICE_OK on completion.
*/
- bool playAsync();
+ int playAsync();
/**
* @brief Begin playing data from the connected upstream
*
* The StreamRecording object will, if already recording; stop recording, rewind to the start of its buffer, and start playing.
*
- * Blocking call, will repeatedly deschedule the current fiber until the playback completes.
+ * Blocking call, will deschedule the current fiber until the playback completes.
*/
void play();
@@ -155,7 +106,7 @@ namespace codal
*
* @return Do not use this value, return semantics are changing.
*/
- bool stop();
+ int stop();
/**
* @brief Erase the internal buffer.
@@ -185,8 +136,6 @@ namespace codal
*/
bool isStopped();
- virtual float getSampleRate();
-
};
}
diff --git a/inc/streams/StreamSplitter.h b/inc/streams/StreamSplitter.h
index 976caff2..c3301963 100644
--- a/inc/streams/StreamSplitter.h
+++ b/inc/streams/StreamSplitter.h
@@ -55,18 +55,16 @@ namespace codal{
class StreamSplitter;
- class SplitterChannel : public DataSource, public DataSink {
+ class SplitterChannel : public DataSourceSink {
private:
StreamSplitter * parent;
- float sampleRate;
- unsigned int inUnderflow;
+ int sampleDropRate = 1;
+ int sampleDropPosition = 0;
+ int sampleSigma = 0;
ManagedBuffer resample( ManagedBuffer _in, uint8_t * buffer = NULL, int length = -1 );
public:
- int pullAttempts; // Number of failed pull request attempts
- uint32_t sentBuffers;
- DataSink * output;
/**
* @brief Construct a new Splitter Channel object.
@@ -80,30 +78,24 @@ namespace codal{
SplitterChannel( StreamSplitter *parent, DataSink *output );
virtual ~SplitterChannel();
- virtual int pullRequest();
uint8_t * pullInto( uint8_t * rawBuffer, int length );
virtual ManagedBuffer pull();
- virtual void connect(DataSink &sink);
- bool isConnected();
- virtual void disconnect();
virtual int getFormat();
virtual int setFormat(int format);
+ virtual int requestSampleDropRate(int sampleDropRate);
virtual float getSampleRate();
- virtual float requestSampleRate(float sampleRate);
+ virtual void dataWanted(int wanted);
};
class StreamSplitter : public DataSink, public CodalComponent
{
private:
ManagedBuffer lastBuffer; // Buffer being processed
- uint64_t __cycle;
public:
- bool isActive; // Track if we need to emit activate/deactivate messages
int channels; // Current number of channels Splitter is serving
- volatile int activeChannels; // Current number of /active/ channels this Splitter is serving
DataSource &upstream; // The upstream component of this Splitter
- SplitterChannel * outputChannels[CONFIG_MAX_CHANNELS]; // Array of SplitterChannels the Splitter is serving
+ SplitterChannel *outputChannels[CONFIG_MAX_CHANNELS]; // Array of SplitterChannels the Splitter is serving
/**
* Creates a component that distributes a single upstream datasource to many downstream datasinks
@@ -111,24 +103,20 @@ namespace codal{
* @param source a DataSource to receive data from
*/
StreamSplitter(DataSource &source, uint16_t id = CodalComponent::generateDynamicID());
+ virtual ~StreamSplitter();
/**
* Callback provided when data is ready.
*/
virtual int pullRequest();
+ virtual void dataWanted(int wanted);
virtual ManagedBuffer getBuffer();
virtual SplitterChannel * createChannel();
virtual bool destroyChannel( SplitterChannel * channel );
virtual SplitterChannel * getChannel( DataSink * output );
- /**
- * Destructor.
- */
- virtual ~StreamSplitter();
-
friend SplitterChannel;
-
};
}
diff --git a/source/streams/DataStream.cpp b/source/streams/DataStream.cpp
index 91539487..3b153b25 100644
--- a/source/streams/DataStream.cpp
+++ b/source/streams/DataStream.cpp
@@ -28,6 +28,7 @@ DEALINGS IN THE SOFTWARE.
#include "Event.h"
#include "CodalFiber.h"
#include "ErrorNo.h"
+#include "CodalDmesg.h"
using namespace codal;
@@ -61,75 +62,98 @@ float DataSource::getSampleRate() {
return DATASTREAM_SAMPLE_RATE_UNKNOWN;
}
-float DataSource::requestSampleRate(float sampleRate) {
- // Just consume this by default, we don't _have_ to honour requests for specific rates.
- return DATASTREAM_SAMPLE_RATE_UNKNOWN;
+void DataSource::dataWanted(int wanted)
+{
+ dataIsWanted = wanted;
+}
+
+int DataSource::isWanted()
+{
+ return dataIsWanted;
}
+//DataSink methods.
int DataSink::pullRequest()
{
return DEVICE_NOT_SUPPORTED;
}
-DataStream::DataStream(DataSource &upstream)
+// DataSourceSink methods.
+DataSourceSink::DataSourceSink(DataSource &source) : upStream( source )
{
- this->pullRequestEventCode = 0;
- this->isBlocking = true;
- this->hasPending = false;
- this->missedBuffers = CODAL_DATASTREAM_HIGH_WATER_MARK;
- this->downstreamReturn = DEVICE_OK;
- this->flowEventCode = 0;
+ downStream = NULL;
+ source.connect( *this );
+ dataWanted(DATASTREAM_DONT_CARE);
+}
- this->downStream = NULL;
- this->upStream = &upstream;
+DataSourceSink::~DataSourceSink()
+{
}
-DataStream::~DataStream()
+void DataSourceSink::connect(DataSink &sink)
{
+ downStream = &sink;
}
-uint16_t DataStream::emitFlowEvents( uint16_t id )
+bool DataSourceSink::isConnected()
+{
+ return downStream != NULL;
+}
+
+void DataSourceSink::disconnect()
{
- if( this->flowEventCode == 0 ) {
- if( id == 0 )
- this->flowEventCode = allocateNotifyEvent();
- else
- this->flowEventCode = id;
- }
- return this->flowEventCode;
+ downStream = NULL;
}
-bool DataStream::isReadOnly()
+int DataSourceSink::getFormat()
{
- if( this->hasPending )
- return this->nextBuffer.isReadOnly();
- return true;
+ return upStream.getFormat();
}
-bool DataStream::isFlowing()
+int DataSourceSink::setFormat(int format)
{
- return this->missedBuffers < CODAL_DATASTREAM_HIGH_WATER_MARK;
+ return upStream.setFormat( format );
}
-void DataStream::connect(DataSink &sink)
+float DataSourceSink::getSampleRate()
{
- this->downStream = &sink;
- this->upStream->connect(*this);
+ return upStream.getSampleRate();
}
-bool DataStream::isConnected()
+void DataSourceSink::dataWanted(int wanted)
{
- return this->downStream != NULL;
+ DataSource::dataWanted(wanted);
+ return upStream.dataWanted(wanted);
}
-int DataStream::getFormat()
+int DataSourceSink::pullRequest()
{
- return upStream->getFormat();
+ if( this->downStream != NULL )
+ return this->downStream->pullRequest();
+ return DEVICE_BUSY;
}
-void DataStream::disconnect()
+/**
+ * Definition for a DataStream class. This doesn't *really* belong in here, as its key role is to
+ * decouple a pipeline the straddles an interrupt context boundary...
+ */
+DataStream::DataStream(DataSource &upstream) : DataSourceSink(upstream)
+{
+ this->pullRequestEventCode = 0;
+ this->isBlocking = true;
+ this->hasPending = false;
+ this->downstreamReturn = DEVICE_OK;
+}
+
+DataStream::~DataStream()
{
- this->downStream = NULL;
+}
+
+bool DataStream::isReadOnly()
+{
+ if( this->hasPending )
+ return this->nextBuffer.isReadOnly();
+ return true;
}
void DataStream::setBlocking(bool isBlocking)
@@ -148,14 +172,9 @@ void DataStream::setBlocking(bool isBlocking)
ManagedBuffer DataStream::pull()
{
- // 1, as we will normally be at '1' waiting buffer here if we're in-sync with the source
- if( this->missedBuffers > 1 )
- Event evt( DEVICE_ID_NOTIFY, this->flowEventCode );
-
- this->missedBuffers = 0;
// Are we running in sync (blocking) mode?
if( this->isBlocking )
- return this->upStream->pull();
+ return this->upStream.pull();
this->hasPending = false;
return ManagedBuffer( this->nextBuffer ); // Deep copy!
@@ -177,12 +196,6 @@ bool DataStream::canPull(int size)
int DataStream::pullRequest()
{
- // _Technically_ not a missed buffer... yet. But we can only check later.
- if( this->missedBuffers < CODAL_DATASTREAM_HIGH_WATER_MARK )
- if( ++this->missedBuffers == CODAL_DATASTREAM_HIGH_WATER_MARK )
- if( this->flowEventCode != 0 )
- Event evt( DEVICE_ID_NOTIFY, this->flowEventCode );
-
// Are we running in async (non-blocking) mode?
if( !this->isBlocking ) {
if( this->hasPending && this->downstreamReturn != DEVICE_OK ) {
@@ -190,7 +203,7 @@ int DataStream::pullRequest()
return this->downstreamReturn;
}
- this->nextBuffer = this->upStream->pull();
+ this->nextBuffer = this->upStream.pull();
this->hasPending = true;
Event evt( DEVICE_ID_NOTIFY, this->pullRequestEventCode );
@@ -203,14 +216,8 @@ int DataStream::pullRequest()
return DEVICE_BUSY;
}
-float DataStream::getSampleRate() {
- if( this->upStream != NULL )
- return this->upStream->getSampleRate();
- return DATASTREAM_SAMPLE_RATE_UNKNOWN;
-}
-
-float DataStream::requestSampleRate(float sampleRate) {
- if( this->upStream != NULL )
- return this->upStream->requestSampleRate( sampleRate );
- return DATASTREAM_SAMPLE_RATE_UNKNOWN;
+void DataStream::connect(DataSink &sink)
+{
+ DMESG("CONNECT REQUEST: this: %p, sink: %p", this, &sink);
+ this->downStream = &sink;
}
\ No newline at end of file
diff --git a/source/streams/EffectFilter.cpp b/source/streams/EffectFilter.cpp
index 5e81ddb4..77285e65 100644
--- a/source/streams/EffectFilter.cpp
+++ b/source/streams/EffectFilter.cpp
@@ -6,11 +6,9 @@
using namespace codal;
-EffectFilter::EffectFilter(DataSource &source, bool deepCopy) : upStream( source )
+EffectFilter::EffectFilter(DataSource &source, bool deepCopy) : DataSourceSink( source )
{
- this->downStream = NULL;
this->deepCopy = deepCopy;
- source.connect( *this );
}
EffectFilter::~EffectFilter()
@@ -26,48 +24,6 @@ ManagedBuffer EffectFilter::pull()
return output;
}
-int EffectFilter::pullRequest()
-{
- if( this->downStream != NULL )
- return this->downStream->pullRequest();
- return DEVICE_BUSY;
-}
-
-void EffectFilter::connect(DataSink &sink)
-{
- this->downStream = &sink;
-}
-
-bool EffectFilter::isConnected()
-{
- return this->downStream != NULL;
-}
-
-void EffectFilter::disconnect()
-{
- this->downStream = NULL;
-}
-
-int EffectFilter::getFormat()
-{
- return this->upStream.getFormat();
-}
-
-int EffectFilter::setFormat( int format )
-{
- return this->upStream.setFormat( format );
-}
-
-float EffectFilter::getSampleRate()
-{
- return this->upStream.getSampleRate();
-}
-
-float EffectFilter::requestSampleRate(float sampleRate)
-{
- return this->upStream.requestSampleRate( sampleRate );
-}
-
/**
* Defines if this filter should perform a deep copy of incoming data, or update data in place.
*
diff --git a/source/streams/FIFOStream.cpp b/source/streams/FIFOStream.cpp
deleted file mode 100644
index 7f10ae6d..00000000
--- a/source/streams/FIFOStream.cpp
+++ /dev/null
@@ -1,137 +0,0 @@
-#include "FIFOStream.h"
-#include "ErrorNo.h"
-#include "DataStream.h"
-#include "ManagedBuffer.h"
-#include "CodalDmesg.h"
-#include "MessageBus.h"
-
-using namespace codal;
-
-FIFOStream::FIFOStream( DataSource &source ) : upStream( source )
-{
- this->bufferCount = 0;
- this->bufferLength = 0;
-
- this->downStream = NULL;
- source.connect( *this );
-
- this->allowInput = false;
- this->allowOutput = false;
-
-}
-
-FIFOStream::~FIFOStream()
-{
- //
-}
-
-bool FIFOStream::canPull()
-{
- return (this->bufferLength > 0) && this->allowOutput;
-}
-
-ManagedBuffer FIFOStream::pull()
-{
- if( (this->bufferLength > 0) && this->allowOutput )
- {
- ManagedBuffer out = buffer[0];
-
- for (int i = 0; i < FIFO_MAXIMUM_BUFFERS-1; i++)
- buffer[i] = buffer[i + 1];
-
- buffer[FIFO_MAXIMUM_BUFFERS-1] = ManagedBuffer();
-
- this->bufferLength -= out.length();
- this->bufferCount--;
-
- if (this->bufferCount > 0 && downStream != NULL)
- downStream->pullRequest();
-
- return out;
- }
-
- return ManagedBuffer();
-}
-
-int FIFOStream::length()
-{
- return this->bufferLength;
-}
-
-bool FIFOStream::isFull() {
- return this->bufferCount < FIFO_MAXIMUM_BUFFERS;
-}
-
-void FIFOStream::dumpState()
-{
- DMESG(
- "TapeDeck { bufferCount = %d/%d, bufferLength = %dB }",
- this->bufferCount,
- FIFO_MAXIMUM_BUFFERS,
- this->bufferLength
- );
-}
-
-int FIFOStream::pullRequest()
-{
- if( this->bufferCount >= FIFO_MAXIMUM_BUFFERS )
- return DEVICE_NO_RESOURCES;
-
- ManagedBuffer inBuffer = this->upStream.pull();
- if( this->allowInput && inBuffer.length() > 0 )
- {
- this->buffer[ this->bufferCount++ ] = inBuffer;
- this->bufferLength += inBuffer.length();
- }
-
- if (bufferCount > 0 && this->allowOutput && downStream != NULL)
- return downStream->pullRequest();
-
- if( this->bufferCount >= FIFO_MAXIMUM_BUFFERS )
- return DEVICE_BUSY;
- return DEVICE_OK;
-}
-
-void FIFOStream::connect( DataSink &sink )
-{
- this->downStream = &sink;
-}
-
-bool FIFOStream::isConnected()
-{
- return this->downStream != NULL;
-}
-
-void FIFOStream::disconnect()
-{
- this->downStream = NULL;
-}
-
-int FIFOStream::getFormat()
-{
- return this->upStream.getFormat();
-}
-
-int FIFOStream::setFormat( int format )
-{
- return this->upStream.setFormat( format );
-}
-
-void FIFOStream::setInputEnable( bool state )
-{
- this->allowInput = state;
-}
-void FIFOStream::setOutputEnable( bool state )
-{
- bool enabling = false;
- DMESG("FIFO:setOutputEnable %d", state );
-
- if (this->allowOutput == false && state)
- enabling = true;
-
- this->allowOutput = state;
-
- // If we've just been enabled and have data to send, issue a pullrequest to ensure our downstream is aware of this
- if (enabling && bufferCount > 0 && downStream != NULL)
- downStream->pullRequest();
-}
\ No newline at end of file
diff --git a/source/streams/LevelDetectorSPL.cpp b/source/streams/LevelDetectorSPL.cpp
index 39032716..91b994fb 100644
--- a/source/streams/LevelDetectorSPL.cpp
+++ b/source/streams/LevelDetectorSPL.cpp
@@ -34,7 +34,7 @@ DEALINGS IN THE SOFTWARE.
using namespace codal;
-LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain, float minValue, uint16_t id, bool activateImmediately) : upstream(source), resourceLock(0)
+LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, float lowThreshold, float gain, float minValue, uint16_t id) : upstream(source), resourceLock(0)
{
this->id = id;
this->level = 0;
@@ -43,16 +43,9 @@ LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, floa
this->highThreshold = highThreshold;
this->minValue = minValue;
this->gain = gain;
- this->status |= LEVEL_DETECTOR_SPL_INITIALISED;
+ this->status |= LEVEL_DETECTOR_SPL_INITIALISED | LEVEL_DETECTOR_SPL_DATA_REQUESTED;
this->unit = LEVEL_DETECTOR_SPL_DB;
- enabled = true;
- if(activateImmediately){
- upstream.connect(*this);
- this->activated = true;
- }
- else{
- this->activated = false;
- }
+ this->enabled = true;
this->quietBlockCount = 0;
this->noisyBlockCount = 0;
@@ -60,15 +53,55 @@ LevelDetectorSPL::LevelDetectorSPL(DataSource &source, float highThreshold, floa
this->maxRms = 0;
this->bufferCount = 0;
- this->timeout = 0;
+ this->listenerCount = 0;
+
+ // Request a periodic callback
+ status |= DEVICE_COMPONENT_STATUS_SYSTEM_TICK;
+
+ source.connect(*this);
+}
+
+
+/**
+ * Periodic callback from Device system timer.
+ * Change the upstream active status accordingly.
+ */
+void LevelDetectorSPL::periodicCallback()
+{
+ // Ensure we don't timeout whilst waiting for data to stabilise.
+ if (this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS)
+ {
+ //DMESG("ALLOWING BUFFERS TO FILL...");
+ return;
+ }
+
+ // Calculate the time since the last request for data.
+ // If this is above the given threshold and our channel is active, request that the upstream generation of data be stopped.
+ if (status & LEVEL_DETECTOR_SPL_DATA_REQUESTED && !listenerCount && (system_timer->getTime() - this->timestamp >= LEVEL_DETECTOR_SPL_TIMEOUT))
+ {
+ //DMESG("LevelDetectorSPL: CALLBACK: DATA NO LONGER REQUIRED...");
+ this->status &= ~LEVEL_DETECTOR_SPL_DATA_REQUESTED;
+ upstream.dataWanted(DATASTREAM_NOT_WANTED);
+ }
}
int LevelDetectorSPL::pullRequest()
{
- // If we're not manually activated, not held active by a timeout, and we have no-one waiting on our data, bail.
- if( !activated && !(system_timer_current_time() - this->timeout < CODAL_STREAM_IDLE_TIMEOUT_MS) && resourceLock.getWaitCount() == 0 ) {
- this->bufferCount = 0;
- return DEVICE_BUSY;
+ //DMESG("LevelDetectorSPL: PR");
+
+ // If we haven't requested data and there are no active listeners, there's nothing to do.
+ if( this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS ) {
+ this->bufferCount++; // Here to prevent this endlessly increasing
+ return DEVICE_OK;
+ }
+
+ if( this->resourceLock.getWaitCount() > 0 )
+ this->resourceLock.notifyAll();
+
+ if (!(status & LEVEL_DETECTOR_SPL_DATA_REQUESTED || listenerCount))
+ {
+ //DMESG("LevelDetectorSPL: PR: ignoring data");
+ return DEVICE_OK;
}
ManagedBuffer b = upstream.pull();
@@ -153,13 +186,6 @@ int LevelDetectorSPL::pullRequest()
* EMIT EVENTS
******************************/
- if( this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS ) {
- this->bufferCount++; // Here to prevent this endlessly increasing
- return DEVICE_OK;
- }
- if( this->resourceLock.getWaitCount() > 0 )
- this->resourceLock.notifyAll();
-
// HIGH THRESHOLD
if ((!(status & LEVEL_DETECTOR_SPL_HIGH_THRESHOLD_PASSED)) && level > highThreshold)
{
@@ -217,31 +243,35 @@ int LevelDetectorSPL::pullRequest()
float LevelDetectorSPL::getValue( int scale )
{
- if( !this->upstream.isConnected() )
- this->upstream.connect( *this );
+ // Update out timestamp.
+ this->timestamp = system_timer->getTime();
+
+ if (!(status & LEVEL_DETECTOR_SPL_DATA_REQUESTED))
+ {
+ // We've just been asked for data after a (potentially) long wait.
+ // Let our upstream components know that we're interested in data again.
+ //DMESG("LevelDetectorSPL: getValue: dataWanted(1)");
+ status |= LEVEL_DETECTOR_SPL_DATA_REQUESTED;
+ upstream.dataWanted(DATASTREAM_WANTED);
+ }
// Lock the resource, THEN bump the timout, so we get consistent on-time
- if( this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS )
+ if(this->bufferCount < LEVEL_DETECTOR_SPL_MIN_BUFFERS)
+ {
+ //DMESG("WAITING ON LevelDetectorSPL::resourceLock...");
+ //codal_dmesg_flush();
resourceLock.wait();
-
- this->timeout = system_timer_current_time();
+ //DMESG("Escaped!");
+ //codal_dmesg_flush();
+ }
return splToUnit( this->level, scale );
}
-void LevelDetectorSPL::activateForEvents( bool state )
-{
- this->activated = state;
- if( this->activated && !this->upstream.isConnected() ) {
- this->upstream.connect( *this );
- }
-}
-
void LevelDetectorSPL::disable(){
enabled = false;
}
-
int LevelDetectorSPL::setLowThreshold(float value)
{
// Convert specified unit into db if necessary
@@ -353,6 +383,12 @@ float LevelDetectorSPL::unitToSpl(float level, int queryUnit)
return level;
}
+void LevelDetectorSPL::listenerAdded()
+{
+ this->listenerCount++;
+ this->getValue();
+}
+
LevelDetectorSPL::~LevelDetectorSPL()
{
}
diff --git a/source/streams/StreamFlowTrigger.cpp b/source/streams/StreamFlowTrigger.cpp
deleted file mode 100644
index a6191163..00000000
--- a/source/streams/StreamFlowTrigger.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-#include "StreamFlowTrigger.h"
-#include "ManagedBuffer.h"
-#include "DataStream.h"
-#include "CodalDmesg.h"
-
-using namespace codal;
-
-StreamFlowTrigger::StreamFlowTrigger( DataSource &source ) : upStream( source )
-{
- this->eventHandler = NULL;
- this->downStream = NULL;
- source.connect( *this );
-}
-
-StreamFlowTrigger::~StreamFlowTrigger()
-{
- // NOP
-}
-
-void StreamFlowTrigger::setDataHandler( void (*handler)(int) )
-{
- this->eventHandler = handler;
-}
-
-ManagedBuffer StreamFlowTrigger::pull()
-{
- (*this->eventHandler)( TRIGGER_PULL );
- return this->upStream.pull();
-}
-
-int StreamFlowTrigger::pullRequest()
-{
- (*this->eventHandler)( TRIGGER_REQUEST );
- if( this->downStream != NULL )
- return this->downStream->pullRequest();
-
- return DEVICE_BUSY;
-}
-
-void StreamFlowTrigger::connect( DataSink &sink )
-{
- this->downStream = &sink;
-}
-
-bool StreamFlowTrigger::isConnected()
-{
- return this->downStream != NULL;
-}
-
-void StreamFlowTrigger::disconnect()
-{
- this->downStream = NULL;
-}
-
-int StreamFlowTrigger::getFormat()
-{
- return this->upStream.getFormat();
-}
-
-int StreamFlowTrigger::setFormat( int format )
-{
- return this->upStream.setFormat( format );
-}
\ No newline at end of file
diff --git a/source/streams/StreamNormalizer.cpp b/source/streams/StreamNormalizer.cpp
index 9b8a04b8..0cffc276 100644
--- a/source/streams/StreamNormalizer.cpp
+++ b/source/streams/StreamNormalizer.cpp
@@ -125,7 +125,7 @@ SampleWriteFn StreamNormalizer::writeSample[] = {write_sample_1, write_sample_1,
* @param format The format to convert the input stream into
* @param stabilisation the maximum change of zero-offset permitted between subsequent buffers before output is initiated. Set to zero to disable (default)
*/
-StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normalize, int format, int stabilisation) : upstream(source), output(*this)
+StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normalize, int format, int stabilisation) : DataSourceSink(source), output(*this)
{
setFormat(format);
setGain(gain);
@@ -135,9 +135,6 @@ StreamNormalizer::StreamNormalizer(DataSource &source, float gain, bool normaliz
this->zeroOffset = 0;
this->stabilisation = stabilisation;
this->outputEnabled = normalize && stabilisation ? false : true;
-
- // Register with our upstream component
- source.connect(*this);
}
/**
@@ -157,9 +154,9 @@ ManagedBuffer StreamNormalizer::pull()
ManagedBuffer buffer; // The buffer being processed.
// Determine the input format.
- inputFormat = upstream.getFormat();
+ inputFormat = upStream.getFormat();
- // If no output format has been selected, infer it from our upstream component.
+ // If no output format has been selected, infer it from our upStream component.
if (outputFormat == DATASTREAM_FORMAT_UNKNOWN)
outputFormat = inputFormat;
@@ -168,7 +165,7 @@ ManagedBuffer StreamNormalizer::pull()
bytesPerSampleOut = DATASTREAM_FORMAT_BYTES_PER_SAMPLE(outputFormat);
// Acquire the buffer to be processed.
- ManagedBuffer inputBuffer = upstream.pull();
+ ManagedBuffer inputBuffer = upStream.pull();
samples = inputBuffer.length() / bytesPerSampleIn;
// Use in place processing where possible, but allocate a new buffer when needed.
@@ -177,7 +174,7 @@ ManagedBuffer StreamNormalizer::pull()
else
buffer = ManagedBuffer(samples * bytesPerSampleOut);
- // Initialise input an doutput buffer pointers.
+ // Initialise input and output buffer pointers.
data = &inputBuffer[0];
result = &buffer[0];
@@ -251,14 +248,8 @@ bool StreamNormalizer::getNormalize()
return normalize;
}
-/**
- * Determine the data format of the buffers streamed out of this component.
- */
int StreamNormalizer::getFormat()
{
- if (outputFormat == DATASTREAM_FORMAT_UNKNOWN)
- outputFormat = upstream.getFormat();
-
return outputFormat;
}
@@ -314,17 +305,3 @@ int StreamNormalizer::setOrMask(uint32_t mask)
StreamNormalizer::~StreamNormalizer()
{
}
-
-float StreamNormalizer::getSampleRate() {
- return this->upstream.getSampleRate();
-}
-
-float StreamNormalizer::requestSampleRate(float sampleRate) {
- return this->upstream.requestSampleRate( sampleRate );
-}
-
-bool StreamNormalizer::isConnected()
-{
- //return this->output.isConnected();
- return false;
-}
\ No newline at end of file
diff --git a/source/streams/StreamRecording.cpp b/source/streams/StreamRecording.cpp
index c5462dc9..958ecc76 100644
--- a/source/streams/StreamRecording.cpp
+++ b/source/streams/StreamRecording.cpp
@@ -7,67 +7,31 @@
using namespace codal;
-// Minimum memory overhead of adding a buffer to the chain
-// StreamRecording_Buffer contains a ManagedBuffer which points to a BufferData
-// StreamRecording_Buffer and BufferData are heap blocks with a PROCESSOR_WORD_TYPE overhead
-#define CODAL_STREAM_RECORDING_BUFFER_OVERHEAD \
- ( sizeof(StreamRecording_Buffer) + sizeof(BufferData) + 2 * sizeof(PROCESSOR_WORD_TYPE))
-
-
-StreamRecording::StreamRecording( DataSource &source, uint32_t maxLength ) : upStream( source )
+StreamRecording::StreamRecording(DataSource &source) : DataSourceSink( source ), recordLock(0, FiberLockMode::MUTEX), playLock(0, FiberLockMode::MUTEX)
{
this->state = REC_STATE_STOPPED;
-
- // The test for "full" was totalBufferLength >= maxBufferLenth
- // Adjust this number by the memory overhead
- // of the old default case with buffers of 256 bytes.
- this->maxBufferLenth = maxLength + ( maxLength / 256 + 1) * CODAL_STREAM_RECORDING_BUFFER_OVERHEAD;
-
- initialise();
-
- this->downStream = NULL;
- upStream.connect( *this );
-}
-
-StreamRecording::~StreamRecording()
-{
- //
-}
-
-void StreamRecording::initialise()
-{
this->totalBufferLength = 0;
- this->totalMemoryUsage = 0;
- this->lastBuffer = NULL;
- this->readHead = NULL;
- this->bufferChain = NULL;
- this->lastUpstreamRate = DATASTREAM_SAMPLE_RATE_UNKNOWN;
-}
-
-bool StreamRecording::canPull()
-{
- return this->totalMemoryUsage < this->maxBufferLenth;
+ this->readOffset = 0;
+ this->writeOffset = 0;
}
ManagedBuffer StreamRecording::pull()
{
- // Are we playing back?
- if( this->state != REC_STATE_PLAYING )
- return ManagedBuffer();
-
- // Do we have data to send?
- if( this->readHead == NULL ) {
- stop();
- return ManagedBuffer();
- }
-
- // Grab the next block and move the r/w head
- ManagedBuffer out = this->readHead->buffer;
- this->readHead = this->readHead->next;
+ ManagedBuffer out;
- // Prod the downstream that we're good to go
- if( downStream != NULL )
- downStream->pullRequest();
+ if( state == REC_STATE_PLAYING && readOffset < CODAL_STREAM_RECORDING_SIZE)
+ out = data[readOffset++];
+
+ // Wake any blocked threads once we reach the end of the playback
+ if (out.length() == 0)
+ {
+ state = REC_STATE_STOPPED;
+ playLock.notifyAll();
+ }
+ else
+ // Indicate to the downstream that another buffer is available.
+ if( downStream != NULL )
+ downStream->pullRequest();
// Return the block
return out;
@@ -83,110 +47,98 @@ float StreamRecording::duration( unsigned int sampleRate )
return ((float)this->length() / DATASTREAM_FORMAT_BYTES_PER_SAMPLE((float)this->getFormat()) ) / (float)sampleRate;
}
-bool StreamRecording::isFull() {
- return this->totalMemoryUsage >= this->maxBufferLenth;
-}
-
-void StreamRecording::printChain()
-{
- #if CONFIG_ENABLED(DMESG_SERIAL_DEBUG) && CONFIG_ENABLED(DMESG_AUDIO_DEBUG)
- DMESGN( "START -> " );
- StreamRecording_Buffer * node = this->bufferChain;
- while( node != NULL ) {
- DMESGN( "%x -> ", (int)(node->buffer.getBytes()) );
- codal_dmesg_flush();
- node = node->next;
- }
- DMESG( "END (%d hz)", (int)this->lastUpstreamRate );
- #endif
-}
-
int StreamRecording::pullRequest()
{
- // Are we recording?
+ // Ignore incoming buffers if we aren't actively recording
if( this->state != REC_STATE_RECORDING )
- return DEVICE_BUSY;
+ return DEVICE_OK;
- ManagedBuffer data = this->upStream.pull();
- this->lastUpstreamRate = this->upStream.getSampleRate();
+ ManagedBuffer buffer = upStream.pull();
- // Are we getting empty buffers (probably because we're out of RAM!)
- if( data == ManagedBuffer() || data.length() <= 1 ) {
+ // Ignore any empty buffers (possibly because we're out of RAM!)
+ if(buffer.length() == 0)
return DEVICE_OK;
- }
- // Can we record any more?
- if( !isFull() )
+ // Store the data in our buffer, if we have space
+ if (writeOffset < CODAL_DEFAULT_STREAM_RECORDING_MAX_LENGTH)
{
- StreamRecording_Buffer * block = new StreamRecording_Buffer( data );
- if( block == NULL )
- return DEVICE_NO_RESOURCES;
- block->next = NULL;
-
- // Are we initialising stuff? If so, hook the front of the chain up too...
- if( this->lastBuffer == NULL ) {
- this->bufferChain = block;
- } else
- this->lastBuffer->next = block;
-
- this->lastBuffer = block;
-
- uint32_t length = this->lastBuffer->buffer.length();
- this->totalBufferLength += length;
- this->totalMemoryUsage += length + CODAL_STREAM_RECORDING_BUFFER_OVERHEAD;
- return DEVICE_OK;
+ if (buffer.length() == CODAL_STREAM_RECORDING_BUFFER_SIZE)
+ {
+ // The incoming buffer is sufficiently large. Just store it.
+ int b = writeOffset / CODAL_STREAM_RECORDING_BUFFER_SIZE;
+ if (b < CODAL_STREAM_RECORDING_SIZE)
+ {
+ data[b] = buffer;
+ writeOffset += buffer.length();
+ totalBufferLength += buffer.length();
+ }
+
+ }else{
+
+ // Buffer is larger or smaller than our our threshold. Copy the data.
+ int length = buffer.length();
+ int input_offset = 0;
+ while (length > 0)
+ {
+ int b = writeOffset / CODAL_STREAM_RECORDING_BUFFER_SIZE;
+ int o = writeOffset % CODAL_STREAM_RECORDING_BUFFER_SIZE;
+ int l = min(length, CODAL_STREAM_RECORDING_BUFFER_SIZE - o);
+
+ if (b < CODAL_STREAM_RECORDING_SIZE)
+ {
+ // Allocate memory for the buffer if needed.
+ if (data[b].length() != CODAL_STREAM_RECORDING_BUFFER_SIZE){
+ data[b] = ManagedBuffer(CODAL_STREAM_RECORDING_BUFFER_SIZE);
+ }
+
+ // Copy in the data from the input buffer.
+ if (data[b].length() == CODAL_STREAM_RECORDING_BUFFER_SIZE)
+ {
+ uint8_t *dst = data[b].getBytes() + o;
+ uint8_t *src = buffer.getBytes() + input_offset;
+ memcpy(dst, src, l);
+
+ length -= l;
+ input_offset += l;
+ writeOffset += l;
+ totalBufferLength += l;
+ }else{
+ // We couldn't allocate the necessary buffer resources. Terminate early.
+ length = 0;
+ stop();
+ }
+ }
+ }
+ }
+ }
+ else
+ {
+ stop();
}
-
- this->stop();
- return DEVICE_NO_RESOURCES;
-}
-
-void StreamRecording::connect( DataSink &sink )
-{
- this->downStream = &sink;
-}
-
-bool StreamRecording::isConnected()
-{
- return this->downStream != NULL;
-}
-
-void StreamRecording::disconnect()
-{
- this->downStream = NULL;
-}
-
-int StreamRecording::getFormat()
-{
- return this->upStream.getFormat();
-}
-int StreamRecording::setFormat( int format )
-{
- return this->upStream.setFormat( format );
+ return DEVICE_OK;
}
-bool StreamRecording::recordAsync()
+int StreamRecording::recordAsync()
{
- // Duplicate check from within erase(), but here for safety in case of later code edits...
- if( this->state != REC_STATE_STOPPED )
- this->stop();
-
- erase();
-
- bool changed = this->state != REC_STATE_RECORDING;
+ // If we're already recording, then treat as a NOP.
+ if(state != REC_STATE_RECORDING)
+ {
+ // We could be playing back. If so, stop first and erase our buffer.
+ stop();
+ erase();
- this->state = REC_STATE_RECORDING;
+ state = REC_STATE_RECORDING;
+ dataWanted(DATASTREAM_WANTED);
+ }
- return changed;
+ return DEVICE_OK;
}
void StreamRecording::record()
{
recordAsync();
- while( isRecording() )
- fiber_sleep(5);
- printChain();
+ recordLock.wait();
}
void StreamRecording::erase()
@@ -194,68 +146,64 @@ void StreamRecording::erase()
if( this->state != REC_STATE_STOPPED )
this->stop();
- // Run down the chain, freeing as we go
- StreamRecording_Buffer * node = this->bufferChain;
- while( node != NULL ) {
- StreamRecording_Buffer * next = node->next;
- delete node;
- node = next;
- }
- initialise();
+ // Erase current buffer
+ for (int i = 0; i < CODAL_STREAM_RECORDING_SIZE; i++)
+ data[i] = ManagedBuffer();
+
+ // Set length
+ totalBufferLength = 0;
+ readOffset = 0;
+ writeOffset = 0;
}
-bool StreamRecording::playAsync()
+int StreamRecording::playAsync()
{
- if( this->state != REC_STATE_STOPPED )
- this->stop();
- bool changed = this->state != REC_STATE_PLAYING;
-
- this->state = REC_STATE_PLAYING;
- if( this->downStream != NULL )
- this->downStream->pullRequest();
+ if( this->state != REC_STATE_PLAYING )
+ {
+ if (this->state == REC_STATE_RECORDING)
+ stop();
- return changed;
+ this->state = REC_STATE_PLAYING;
+ readOffset = 0;
+
+ if( this->downStream != NULL )
+ this->downStream->pullRequest();
+ }
+
+ return DEVICE_OK;
}
void StreamRecording::play()
{
playAsync();
- while( isPlaying() )
- fiber_sleep(5);
+ playLock.wait();
}
-bool StreamRecording::stop()
+int StreamRecording::stop()
{
- bool changed = this->state != REC_STATE_STOPPED;
+ if (this->state != REC_STATE_STOPPED)
+ {
+ this->state = REC_STATE_STOPPED;
+ dataWanted(DATASTREAM_DONT_CARE);
+ recordLock.notifyAll();
+ }
- this->state = REC_STATE_STOPPED;
- this->readHead = this->bufferChain; // Snap to the start
+ this->readOffset = 0;
- return changed;
+ return DEVICE_OK;
}
bool StreamRecording::isPlaying()
{
- fiber_sleep(0);
return this->state == REC_STATE_PLAYING;
}
bool StreamRecording::isRecording()
{
- fiber_sleep(0);
return this->state == REC_STATE_RECORDING;
}
bool StreamRecording::isStopped()
{
- fiber_sleep(0);
return this->state == REC_STATE_STOPPED;
}
-
-float StreamRecording::getSampleRate()
-{
- if( this->lastUpstreamRate == DATASTREAM_SAMPLE_RATE_UNKNOWN )
- return this->upStream.getSampleRate();
-
- return this->lastUpstreamRate;
-}
\ No newline at end of file
diff --git a/source/streams/StreamSplitter.cpp b/source/streams/StreamSplitter.cpp
index 400b3dac..2d732f2e 100644
--- a/source/streams/StreamSplitter.cpp
+++ b/source/streams/StreamSplitter.cpp
@@ -31,127 +31,67 @@ DEALINGS IN THE SOFTWARE.
using namespace codal;
-SplitterChannel::SplitterChannel( StreamSplitter * parent, DataSink * output = NULL )
+SplitterChannel::SplitterChannel( StreamSplitter * parent, DataSink * output = NULL ) : DataSourceSink(*(new DataSource()))
{
- this->sampleRate = DATASTREAM_SAMPLE_RATE_UNKNOWN;
this->parent = parent;
- this->output = output;
- this->pullAttempts = 0;
- this->sentBuffers = 0;
- this->inUnderflow = 0;
+ this->downStream = output;
}
SplitterChannel::~SplitterChannel()
{
- //
}
-int SplitterChannel::pullRequest() {
- this->pullAttempts++;
- if( output != NULL )
- return output->pullRequest();
- return DEVICE_BUSY;
-}
-
-ManagedBuffer SplitterChannel::resample( ManagedBuffer _in, uint8_t * buffer, int length ) {
+ManagedBuffer SplitterChannel::resample( ManagedBuffer _in, uint8_t *buffer, int length ) {
- // Going the long way around - drop any extra samples...
- float inRate = parent->upstream.getSampleRate();
- float outRate = sampleRate;
-
+ // Fast path. Perform a shallow copy of the input buffer where possible.
+ // TODO: verify this is still a safe operation under all conditions.
+ if (this->sampleDropRate == 1)
+ return _in;
+
+ // Going the long way around - drop any excess samples...
int inFmt = parent->upstream.getFormat();
int bytesPerSample = DATASTREAM_FORMAT_BYTES_PER_SAMPLE( inFmt );
int totalSamples = _in.length() / bytesPerSample;
+ int numOutputSamples = (totalSamples / sampleDropRate) + 1;
+ uint8_t *outPtr = NULL;
- // Integer estimate the number of sample drops required
- int byteDeficit = (int)inRate - (int)outRate;
- int packetsPerSec = (int)inRate / totalSamples;
- int dropPerPacket = byteDeficit / packetsPerSec;
- int samplesPerOut = totalSamples - dropPerPacket;
-
- // If we're not supplied an external buffer, make our own...
- uint8_t * output = buffer;
- if( output == NULL ) {
- output = (uint8_t *)malloc( samplesPerOut * bytesPerSample );
- length = samplesPerOut * bytesPerSample;
- } else {
- if (length > samplesPerOut * bytesPerSample) {
- length = samplesPerOut * bytesPerSample;
- }
- }
-
- int oversample_offset = 0;
- int oversample_step = (totalSamples * CONFIG_SPLITTER_OVERSAMPLE_STEP) / samplesPerOut;
+ ManagedBuffer output = ManagedBuffer(numOutputSamples * bytesPerSample);
+ outPtr = output.getBytes();
- uint8_t *inPtr = &_in[0];
- uint8_t *outPtr = output;
- while( outPtr - output < length )
+ for (int i = 0; i < totalSamples * bytesPerSample; i++)
{
- int a = StreamNormalizer::readSample[inFmt]( inPtr + ((int)(oversample_offset / CONFIG_SPLITTER_OVERSAMPLE_STEP) * bytesPerSample) );
- int b = StreamNormalizer::readSample[inFmt]( inPtr + (((int)(oversample_offset / CONFIG_SPLITTER_OVERSAMPLE_STEP) + 1) * bytesPerSample) );
- int s = a + ((int)((b - a)/CONFIG_SPLITTER_OVERSAMPLE_STEP) * (oversample_offset % CONFIG_SPLITTER_OVERSAMPLE_STEP));
+ sampleSigma += StreamNormalizer::readSample[inFmt]( &_in[i]);
+ sampleDropPosition++;
- oversample_offset += oversample_step;
+ if (sampleDropPosition >= sampleDropRate)
+ {
+ StreamNormalizer::writeSample[inFmt](outPtr, sampleSigma / sampleDropRate);
+ outPtr += bytesPerSample;
- StreamNormalizer::writeSample[inFmt](outPtr, s);
- outPtr += bytesPerSample;
+ sampleDropPosition = 0;
+ sampleSigma = 0;
+ }
}
- ManagedBuffer result = ManagedBuffer( output, length );
-
- // Did we create this memory? If so, free it again.
- if( buffer == NULL )
- free( output );
+ output.truncate(outPtr - output.getBytes());
- return result;
+ return output;
}
uint8_t * SplitterChannel::pullInto( uint8_t * rawBuffer, int length )
{
- this->pullAttempts = 0;
- this->sentBuffers++;
ManagedBuffer inData = parent->getBuffer();
-
- // Shortcuts - we can't fabricate samples, so just pass on what we can if we don't know or can't keep up.
- if( this->sampleRate == DATASTREAM_SAMPLE_RATE_UNKNOWN || this->sampleRate >= this->parent->upstream.getSampleRate() ) {
- inData.readBytes( rawBuffer, 0, min(inData.length(), length) );
- return rawBuffer + min(inData.length(), length);
- }
-
ManagedBuffer result = this->resample( inData, rawBuffer, length );
+
return rawBuffer + result.length();
}
ManagedBuffer SplitterChannel::pull()
{
- this->pullAttempts = 0;
- this->sentBuffers++;
ManagedBuffer inData = parent->getBuffer();
-
- // Shortcuts - we can't fabricate samples, so just pass on what we can if we don't know or can't keep up.
- if( this->sampleRate == DATASTREAM_SAMPLE_RATE_UNKNOWN || this->sampleRate >= this->parent->upstream.getSampleRate() )
- return inData;
-
return this->resample( inData ); // Autocreate the output buffer
}
-void SplitterChannel::connect(DataSink &sink)
-{
- output = &sink;
- Event e( parent->id, SPLITTER_CHANNEL_CONNECT );
-}
-
-bool SplitterChannel::isConnected()
-{
- return this->output != NULL;
-}
-
-void SplitterChannel::disconnect()
-{
- output = NULL;
- Event e( parent->id, SPLITTER_CHANNEL_DISCONNECT );
-}
-
int SplitterChannel::getFormat()
{
return parent->upstream.getFormat();
@@ -162,33 +102,27 @@ int SplitterChannel::setFormat(int format)
return parent->upstream.setFormat( format );
}
-float SplitterChannel::getSampleRate()
+int SplitterChannel::requestSampleDropRate( int sampleDropRate )
{
- if( sampleRate != DATASTREAM_SAMPLE_RATE_UNKNOWN )
- return sampleRate;
- return parent->upstream.getSampleRate();
+ // TODO: Any validaiton to do here? Or do we permit any integer multiple?
+ this->sampleDropRate = sampleDropRate;
+ this->sampleDropPosition = 0;
+ this->sampleSigma = 0;
+
+ return this->sampleDropRate;
}
-float SplitterChannel::requestSampleRate( float sampleRate )
+void SplitterChannel::dataWanted(int wanted)
{
- this->sampleRate = sampleRate;
-
- // Do we need to request a higher rate upstream?
- if( parent->upstream.getSampleRate() < sampleRate ) {
-
- // Request it, and if we got less that we expected, report that rate
- if( parent->upstream.requestSampleRate( sampleRate ) < sampleRate )
- return parent->upstream.getSampleRate();
+ // Only pass along the requets if our status has changed.
+ if (wanted != DataSource::isWanted())
+ {
+ //DMESG("SplitterChannel[%p]: dataWanted: %d", this, wanted);
+ DataSource::dataWanted(wanted);
+ parent->dataWanted(wanted);
}
-
- // Otherwise, report our own rate (we're matching or altering it ourselves)
- return sampleRate;
}
-
-
-
-
/**
* Creates a component that distributes a single upstream datasource to many downstream datasinks
*
@@ -198,27 +132,35 @@ StreamSplitter::StreamSplitter(DataSource &source, uint16_t id) : upstream(sourc
{
this->id = id;
this->channels = 0;
- this->activeChannels = 0;
- this->isActive = false;
// init array to NULL.
for (int i = 0; i < CONFIG_MAX_CHANNELS; i++)
outputChannels[i] = NULL;
upstream.connect(*this);
-
- this->__cycle = 0;
- //this->status |= DEVICE_COMPONENT_STATUS_SYSTEM_TICK;
}
StreamSplitter::~StreamSplitter()
{
- // Nop.
+}
+
+void StreamSplitter::dataWanted(int wanted)
+{
+ // Determine if any of our active splitter channels require data.
+ int streamWanted = DATASTREAM_DONT_CARE;
+
+ for(int i=0; iisWanted() > streamWanted)
+ streamWanted = outputChannels[i]->isWanted();
+ }
+
+ return upstream.dataWanted(streamWanted);
}
ManagedBuffer StreamSplitter::getBuffer()
{
- if( lastBuffer == ManagedBuffer() )
+ if(lastBuffer == ManagedBuffer())
lastBuffer = upstream.pull();
return lastBuffer;
@@ -229,30 +171,15 @@ ManagedBuffer StreamSplitter::getBuffer()
*/
int StreamSplitter::pullRequest()
{
- activeChannels = 0;
-
// For each downstream channel that exists in array outputChannels - make a pullRequest
for (int i = 0; i < CONFIG_MAX_CHANNELS; i++)
{
- if (outputChannels[i] != NULL) {
- if( outputChannels[i]->pullRequest() == DEVICE_OK ) {
- activeChannels++;
-
- if( !isActive )
- Event e( id, SPLITTER_ACTIVATE );
- isActive = true;
- }
- }
+ if (outputChannels[i] != NULL)
+ outputChannels[i]->pullRequest();
}
- if( activeChannels == 0 && isActive ) {
- Event e( id, SPLITTER_DEACTIVATE );
- isActive = false;
- }
-
lastBuffer = ManagedBuffer();
- Event e( id, SPLITTER_TICK );
return DEVICE_BUSY;
}
@@ -269,6 +196,7 @@ SplitterChannel * StreamSplitter::createChannel()
break;
}
}
+
if(placed != -1) {
channels++;
return outputChannels[placed];
@@ -295,11 +223,15 @@ SplitterChannel * StreamSplitter::getChannel( DataSink * output ) {
{
if( outputChannels[i] != NULL )
{
- if( outputChannels[i]->output == output ) {
+ if( outputChannels[i]->downStream == output ) {
return outputChannels[i];
}
}
}
return NULL;
+}
+
+float SplitterChannel::getSampleRate() {
+ return parent->upstream.getSampleRate() / sampleDropRate;
}
\ No newline at end of file
diff --git a/source/streams/Synthesizer.cpp b/source/streams/Synthesizer.cpp
index 6497287e..a916a16d 100644
--- a/source/streams/Synthesizer.cpp
+++ b/source/streams/Synthesizer.cpp
@@ -122,7 +122,7 @@ Synthesizer::Synthesizer(int sampleRate, bool isSigned) : output(*this)
*/
void Synthesizer::idleCallback()
{
- if (bytesWritten && !synchronous && !active && output.canPull(bytesWritten))
+ if (bytesWritten && !synchronous && !active)
{
buffer.truncate(bytesWritten);
output.pullRequest();