diff --git a/catkin/CMakeLists.txt b/catkin/CMakeLists.txt index 1fa5817..f812ec5 100644 --- a/catkin/CMakeLists.txt +++ b/catkin/CMakeLists.txt @@ -48,6 +48,7 @@ add_library(${PROJECT_NAME} ../${PROJECT_NAME}/airtalgo.cpp ../${PROJECT_NAME}/Algo.cpp ../${PROJECT_NAME}/ChannelReorder.cpp + ../${PROJECT_NAME}/CircularBuffer.cpp ../${PROJECT_NAME}/EndPointCallback.cpp ../${PROJECT_NAME}/EndPoint.cpp ../${PROJECT_NAME}/EndPointRead.cpp diff --git a/drain/CircularBuffer.cpp b/drain/CircularBuffer.cpp new file mode 100644 index 0000000..78200e9 --- /dev/null +++ b/drain/CircularBuffer.cpp @@ -0,0 +1,295 @@ +/** @file + * @author Edouard DUPIN + * @copyright 2011, Edouard DUPIN, all right reserved + * @license APACHE v2.0 (see license file) + */ + +#include +#include + +drain::CircularBuffer::CircularBuffer(const drain::CircularBuffer& _obj) : + m_data(), + m_write(nullptr), + m_read(nullptr), + m_timeRead(), + m_capacity(0), + m_sizeChunk(0), + m_size(0) { + DRAIN_CRITICAL("error"); +}; +/** + * @brief copy operator. + */ +drain::CircularBuffer& drain::CircularBuffer::operator=(const drain::CircularBuffer& _obj) { + DRAIN_CRITICAL("error"); + return *this; +}; + +drain::CircularBuffer::CircularBuffer() : + m_data(), + m_write(nullptr), + m_read(nullptr), + m_timeRead(), + m_capacity(0), + m_sizeChunk(0), + m_size(0) { + // nothing to do ... +} + +drain::CircularBuffer::~CircularBuffer() { + m_data.clear(); + m_read = nullptr; + m_write = nullptr; +} + +void drain::CircularBuffer::setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency) { + if ( _chunkSize == m_sizeChunk + && _capacity == m_capacity) { + clear(); + return; + } + DRAIN_DEBUG("buffer setCapacity(" << _capacity << "," << _chunkSize << ")"); + if (_capacity == 0) { + DRAIN_ERROR("set a buffer capacity with 0 data ... (reset default at 4096)"); + _capacity = 4096; + } + if (_chunkSize == 0) { + DRAIN_ERROR("set a buffer capacity with chunksize = 0 ... (reset default at 8)"); + _chunkSize = 8; + } + m_data.clear(); + m_write = nullptr; + m_read = nullptr; + m_frequency = _frequency; + m_capacity = _capacity; + m_sizeChunk = _chunkSize; + m_size = 0; + if ( _capacity == 0 + || _chunkSize == 0) { + m_capacity = 0; + m_sizeChunk = 0; + return; + } + m_data.resize(m_capacity*m_sizeChunk, 0); + m_read = &m_data[0]; + m_write = &m_data[0]; +} + +void drain::CircularBuffer::setCapacity(std11::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency) { + uint32_t nbSampleNeeded = _frequency*_capacity.count()/1000; + DRAIN_DEBUG("buffer setCapacity(" << _capacity.count() << "ms ," << _chunkSize << ")"); + setCapacity(nbSampleNeeded, _chunkSize, _frequency); +} + + +size_t drain::CircularBuffer::getUsedSizeBeforEnd() const { + size_t size; + if (m_read < m_write) { + size = static_cast(m_write) - static_cast(m_read); + // the size result is in bytes we need to have it in element + size /= m_sizeChunk; + } else if ( m_read == m_write + && m_size == 0) { + // no element in the buffer + size = 0; + } else { + size = &m_data[0] + (m_capacity*m_sizeChunk) - static_cast(m_read); + // the size result is in bytes we need to have it in element + size /= m_sizeChunk; + } + return size; +} + +size_t drain::CircularBuffer::getFreeSizeBeforEnd() const { + size_t size; + size = &m_data[0] + + (m_capacity*m_sizeChunk) + - static_cast(m_write); + // the size result is in Octet we need to have it in element + size /= m_sizeChunk; + return size; +} +size_t drain::CircularBuffer::write(const void* _data, size_t _nbChunk) { + return write(_data, _nbChunk, std11::chrono::system_clock::now());//m_timeRead + ) +} + +size_t drain::CircularBuffer::write(const void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time) { + if (m_data.size() == 0) { + DRAIN_ERROR("EMPTY Buffer"); + return _nbChunk; + } + size_t nbElementDrop = 0; + size_t freeSizeBeforeEnd = getFreeSizeBeforEnd(); + size_t freeSize = m_capacity - m_size; + // Write element in all case + // calculate the number of element that are overwritten + if (freeSize < _nbChunk) { + nbElementDrop = _nbChunk - freeSize; + } + // if User Request a write more important than the size of the buffer ==> update the pointer to feet only on the buffer size + if (m_capacity < _nbChunk) { + DRAIN_WARNING("CircularBuffer Write too BIG " << _nbChunk << " buffer max size : " << m_capacity << " (keep last Elements)"); + // Move data pointer + _data = static_cast(_data) + (_nbChunk - m_capacity) * m_sizeChunk; + // update size + _nbChunk = m_capacity; + } + // if no element in the FIFO ==> first time write or no more data inside ==> start set the file of the read data ... + if (m_size == 0) { + m_timeRead = _time; + } + // TODO : Check time to push continuous data ... + + if (freeSizeBeforeEnd >= _nbChunk) { + // all Data will be copy + memcpy(m_write, _data, _nbChunk * m_sizeChunk); + // update Writing pointer + if (freeSizeBeforeEnd == _nbChunk) { + // update to the end of FIFO ==> update to the start + m_write = &m_data[0]; + } else { + m_write = static_cast(m_write) + _nbChunk * m_sizeChunk; + } + // Update the number of element in the buffer. + m_size += _nbChunk; + } else { + // copy data to the end of buffer + memcpy(m_write, _data, freeSizeBeforeEnd * m_sizeChunk); + // update Writing pointer ==> end of buffer ==> go to the start + m_write = &m_data[0]; + // update data pointer + _data = static_cast(_data) + freeSizeBeforeEnd * m_sizeChunk; + m_size += freeSizeBeforeEnd; + // get the number of element we need to write + _nbChunk -= freeSizeBeforeEnd; + // Copy the las data if needed + if (_nbChunk != 0) { + memcpy(m_write, _data, _nbChunk * m_sizeChunk); + // update Writing pointer + m_write = static_cast(m_write) + _nbChunk * m_sizeChunk; + m_size += _nbChunk; + } + } + if (nbElementDrop > 0) { + // if drop element we need to update the reading pointer + m_read = m_write; + m_size = m_capacity; + } + // return the number of element Overwrite + return nbElementDrop; +} + +size_t drain::CircularBuffer::read(void* _data, size_t _nbChunk) { + return read(_data, _nbChunk, m_timeRead); +} + +size_t drain::CircularBuffer::read(void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time) { + size_t nbElementDrop = 0; + // Critical section (theoriquely protected by Mutex) + size_t usedSizeBeforeEnd = getUsedSizeBeforEnd(); + // verify if we have elements in the Buffer + if (0 < m_size) { + // check the time of the read : + std11::chrono::nanoseconds deltaTime = m_timeRead - _time; + if (deltaTime.count() == 0) { + // nothing to do ==> just copy data ... + } else if (deltaTime.count() > 0) { + // Add empty sample in the output buffer ... + size_t nbSampleEmpty = m_frequency*deltaTime.count()/100000000; + nbSampleEmpty = std::min(nbSampleEmpty, _nbChunk); + DRAIN_WARNING("add Empty sample in the output buffer " << nbSampleEmpty << " / " << _nbChunk); + memset(_data, 0, nbSampleEmpty * m_sizeChunk); + if (nbSampleEmpty == _nbChunk) { + return 0; + } + _nbChunk -= nbSampleEmpty; + } else { + // Remove data from the FIFO + setReadPosition(_time); + } + if (m_size < _nbChunk) { + nbElementDrop = _nbChunk - m_size; + DRAIN_VERBOSE("crop nb sample : m_size=" << m_size << " _nbChunk=" << _nbChunk); + _nbChunk = m_size; + } + m_timeRead += std11::chrono::microseconds(_nbChunk*1000000/m_frequency); + if (usedSizeBeforeEnd >= _nbChunk) { + // all Data will be copy + memcpy(_data, m_read, _nbChunk * m_sizeChunk); + // update Writing pointer + m_read = static_cast(m_read) + _nbChunk * m_sizeChunk; + m_size -= _nbChunk; + // update output pointer in case of flush with 0 data + _data = static_cast(_data) + _nbChunk * m_sizeChunk; + } else { + // copy data to the end of buffer + memcpy(_data, m_read, usedSizeBeforeEnd * m_sizeChunk); + // update Writing pointer ==> end of buffer ==> go to the start + m_read = &m_data[0]; + _data = static_cast(_data) + usedSizeBeforeEnd * m_sizeChunk; + m_size -= usedSizeBeforeEnd; + // get the number of element we need to write + _nbChunk -= usedSizeBeforeEnd; + // Copy the last data if needed + if (0 != _nbChunk) { + memcpy(_data, m_read, _nbChunk * m_sizeChunk); + // update Writing pointer + m_read = static_cast(m_read) + _nbChunk * m_sizeChunk; + m_size -= _nbChunk; + // update output pointer in case of flush with 0 data + _data = static_cast(_data) + _nbChunk * m_sizeChunk; + } + } + } else { + nbElementDrop = _nbChunk; + } + if (0 < nbElementDrop) { + // set 0 in last element of the output + memset(_data, 0, m_sizeChunk * nbElementDrop); + } + // return the number of element droped + return nbElementDrop; +} + +void drain::CircularBuffer::setReadPosition(const std11::chrono::system_clock::time_point& _time) { + // Critical section (theoriquely protected by Mutex) + size_t usedSizeBeforeEnd = getUsedSizeBeforEnd(); + if (0 < m_size) { + // check the time of the read : + std11::chrono::nanoseconds deltaTime = _time - m_timeRead; + size_t nbSampleToRemove = int64_t(m_frequency)*int64_t(deltaTime.count())/1000000000LL; + nbSampleToRemove = std::min(nbSampleToRemove, m_size); + DRAIN_VERBOSE("Remove sample in the buffer " << nbSampleToRemove << " / " << m_size); + std11::chrono::nanoseconds updateTime((int64_t(nbSampleToRemove)*1000000000LL)/int64_t(m_frequency)); + DRAIN_VERBOSE(" add time : " << updateTime.count() << "ns / " << deltaTime.count() << "ns"); + if (usedSizeBeforeEnd >= nbSampleToRemove) { + usedSizeBeforeEnd -= nbSampleToRemove; + m_size -= nbSampleToRemove; + m_read = static_cast(m_read) + nbSampleToRemove * m_sizeChunk; + } else { + nbSampleToRemove -= usedSizeBeforeEnd; + m_size -= nbSampleToRemove; + m_read = &m_data[0] + nbSampleToRemove*m_sizeChunk; + } + m_timeRead += updateTime; + //m_timeRead += deltaTime; + } else { + m_timeRead = std11::chrono::system_clock::time_point(); + } +} + + +size_t drain::CircularBuffer::getFreeSize() const { + return m_capacity - m_size; +} + +void drain::CircularBuffer::clear() { + DRAIN_DEBUG("buffer clear()"); + // set pointer to the start + m_read = &m_data[0]; + m_write = &m_data[0]; + // Clean the number of element in the buffer + m_size = 0; + // Clean all element inside : + memset(&m_data[0], 0, m_sizeChunk * m_capacity); +} diff --git a/drain/CircularBuffer.h b/drain/CircularBuffer.h new file mode 100644 index 0000000..6fd05bb --- /dev/null +++ b/drain/CircularBuffer.h @@ -0,0 +1,143 @@ +/** @file + * @author Edouard DUPIN + * @copyright 2011, Edouard DUPIN, all right reserved + * @license APACHE v2.0 (see license file) + */ + + +#ifndef __DRAIN_CIRCULAR_BUFFER_H__ +#define __DRAIN_CIRCULAR_BUFFER_H__ + +#include +#include +#include + +namespace drain { + /** + * For these functions we have 4 solutions : + * - Free Buffer + * ---------------------------------------------------------- + * m_data | | | + * ---------------------------------------------------------- + * m_write + * m_read + * - Full Buffer + * ---------------------------------------------------------- + * m_data |****************************|***************************| + * ---------------------------------------------------------- + * m_write + * m_read + * - Buffer in used + * ---------------------------------------------------------- + * m_data | |********************| | + * ---------------------------------------------------------- + * m_read m_write + * - Buffer out used + * ---------------------------------------------------------- + * m_data |****************| |******************| + * ---------------------------------------------------------- + * m_read m_write + */ + class CircularBuffer { + private: + std::vector m_data; //!< data pointer + void* m_write; //!< write pointer + void* m_read; //!< read pointer + std11::chrono::system_clock::time_point m_timeRead; //!< current read time + uint32_t m_frequency; + // TODO : Remove the m_size ==> this is a bad element to be mutex-less + size_t m_size; //!< number of chunk availlable in this buffer + size_t m_capacity; //!< number of chunk available in this Buffer + size_t m_sizeChunk; //!< Size of one chunk (in byte) + public: + CircularBuffer(); + ~CircularBuffer(); + /** + * @brief copy contructor. + * @param[in] _obj Circular buffer object + */ + CircularBuffer(const drain::CircularBuffer& _obj); + /** + * @brief copy operator. + * @param[in] _obj Circular buffer object + */ + CircularBuffer& operator=(const drain::CircularBuffer& _obj); + /** + * @brief set the capacity of the circular buffer. + * @param[in] _capacity Number of chunk in the buffer. + * @param[in] _chunkSize Size of one chunk. + * @param[in] _frequency Frequency of the buffer + */ + void setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency); + /** + * @brief set the capacity of the circular buffer. + * @param[in] _capacity time in millisecond stored in the buffer. + * @param[in] _chunkSize Size of one chunk. + * @param[in] _frequency Frequency of the buffer + */ + void setCapacity(std11::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency); + void setCapacity(std11::chrono::microseconds _capacity, size_t _chunkSize, uint32_t _frequency) { + setCapacity(std11::chrono::milliseconds(_capacity.count()/1000), _chunkSize, _frequency); + } + /** + * @brief get free size of the buffer. + * @return Number of free chunk. + */ + size_t getFreeSize() const; + /** + * @brief Get number of chunk in the buffer. + * @return number of chunk. + */ + size_t getSize() const { + return m_size; + } + /** + * @brief Get number of chunk that can be set in the buffer. + * @return number of chunk. + */ + size_t getCapacity() const { + return m_capacity; + } + /** + * @brief Write chunk in the buffer. + * @param[in] _data Pointer on the data. + * @param[in] _nbChunk number of chunk to copy. + * @param[in] _time Time to start write data (if before end ==> not replace data, write only if after end) + * @return Number of chunk copied. + */ + size_t write(const void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time); + size_t write(const void* _data, size_t _nbChunk); + /** + * @brief Read Chunk from the buffer to the pointer data. + * @param[out] _data Pointer on the data. + * @param[in] _nbChunk number of chunk to copy. + * @param[in] _time Time to start read data (if before start ==> add 0 at start, if after, remove unread data) + * @return Number of chunk copied. + */ + size_t read(void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time); + //! @previous + size_t read(void* _data, size_t _nbChunk); + void setReadPosition(const std11::chrono::system_clock::time_point& _time); + + std11::chrono::system_clock::time_point getReadTimeStamp() { + return m_timeRead; + } + /** + * @brief Clear the buffer. + */ + void clear(); + private: + /** + * @brief Get number of free chunks before end of buffer. + * @return Number of chunk. + */ + size_t getFreeSizeBeforEnd() const; + /** + * @brief Get number of used chunks before end of buffer. + * @return Number of chunk. + */ + size_t getUsedSizeBeforEnd() const; + }; +} + +#endif \ No newline at end of file diff --git a/drain/EndPointRead.cpp b/drain/EndPointRead.cpp index 5b94ad1..9a95c41 100644 --- a/drain/EndPointRead.cpp +++ b/drain/EndPointRead.cpp @@ -33,11 +33,39 @@ void drain::EndPointRead::configurationChange() { bool drain::EndPointRead::process(std11::chrono::system_clock::time_point& _time, - void* _input, - size_t _inputNbChunk, - void*& _output, - size_t& _outputNbChunk){ + void* _input, + size_t _inputNbChunk, + void*& _output, + size_t& _outputNbChunk){ drain::AutoLogInOut tmpLog("EndPointRead"); return false; } +void drain::EndPointRead::setBufferSize(size_t _nbChunk) { + DRAIN_TODO("..."); +} + +void drain::EndPointRead::setBufferSize(const std11::chrono::microseconds& _time) { + DRAIN_TODO("..."); +} + +size_t drain::EndPointRead::getBufferSize() { + DRAIN_TODO("..."); + return 0; +} + +std11::chrono::microseconds drain::EndPointRead::getBufferSizeMicrosecond() { + DRAIN_TODO("..."); + return std11::chrono::microseconds(0); +} + +size_t drain::EndPointRead::getBufferFillSize() { + DRAIN_TODO("..."); + return 0; +} + +std11::chrono::microseconds drain::EndPointRead::getBufferFillSizeMicrosecond() { + DRAIN_TODO("..."); + return std11::chrono::microseconds(0); +} + diff --git a/drain/EndPointRead.h b/drain/EndPointRead.h index c319f3c..37530fc 100644 --- a/drain/EndPointRead.h +++ b/drain/EndPointRead.h @@ -29,6 +29,36 @@ namespace drain{ size_t _inputNbChunk, void*& _output, size_t& _outputNbChunk); + /** + * @brief Set buffer size in chunk number + * @param[in] _nbChunk Number of chunk in the buffer + */ + virtual void setBufferSize(size_t _nbChunk); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @param[in] _time Time in microsecond of the buffer + */ + virtual void setBufferSize(const std11::chrono::microseconds& _time); + /** + * @brief get buffer size in chunk number + * @return Number of chunk that can be written in the buffer + */ + virtual size_t getBufferSize(); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @return Time in microsecond that can be written in the buffer + */ + virtual std11::chrono::microseconds getBufferSizeMicrosecond(); + /** + * @brief Get buffer size filled in chunk number + * @return Number of chunk in the buffer (that might be read/write) + */ + virtual size_t getBufferFillSize(); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @return Time in microsecond of the buffer (that might be read/write) + */ + virtual std11::chrono::microseconds getBufferFillSizeMicrosecond(); }; }; diff --git a/drain/EndPointWrite.cpp b/drain/EndPointWrite.cpp index 887fb26..d9054d4 100644 --- a/drain/EndPointWrite.cpp +++ b/drain/EndPointWrite.cpp @@ -11,13 +11,20 @@ #define __class__ "EndPointWrite" drain::EndPointWrite::EndPointWrite() : - m_function(nullptr) { + m_function(nullptr), + m_bufferSizeMicroseconds(1000000) { } void drain::EndPointWrite::init() { drain::EndPoint::init(); m_type = "EndPoint"; + if ( audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size() != 0 + && m_output.getFrequency() != 0) { + m_buffer.setCapacity(m_bufferSizeMicroseconds, + audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size(), + m_output.getFrequency()); + } } std11::shared_ptr drain::EndPointWrite::create() { @@ -28,6 +35,19 @@ std11::shared_ptr drain::EndPointWrite::create() { void drain::EndPointWrite::configurationChange() { drain::EndPoint::configurationChange(); + // update the buffer size ... + if ( audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size() != 0 + && m_output.getFrequency() != 0) { + if (std11::chrono::microseconds(0) != m_bufferSizeMicroseconds) { + m_buffer.setCapacity(m_bufferSizeMicroseconds, + audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size(), + m_output.getFrequency()); + } else { + m_buffer.setCapacity(m_bufferSizeChunk, + audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size(), + m_output.getFrequency()); + } + } m_needProcess = true; } @@ -38,9 +58,10 @@ bool drain::EndPointWrite::process(std11::chrono::system_clock::time_point& _tim void*& _output, size_t& _outputNbChunk){ drain::AutoLogInOut tmpLog("EndPointWrite"); - //DRAIN_INFO(" nb Sample in buffer : " << m_tmpData.size()); + //DRAIN_INFO(" nb Sample in buffer : " << m_buffer.size()); if (m_function != nullptr) { - if (m_tmpData.size() <= 20000) { + // TODO : Rework this ... + if (m_buffer.getSize() <= 20000) { m_function(_time, _inputNbChunk, m_output.getFormat(), m_output.getFrequency(), m_output.getMap()); } } @@ -52,21 +73,21 @@ bool drain::EndPointWrite::process(std11::chrono::system_clock::time_point& _tim _output = &m_outputData[0]; std11::unique_lock lock(m_mutex); // check if data in the tmpBuffer - if (m_tmpData.size() == 0) { + if (m_buffer.getSize() == 0) { DRAIN_WARNING("No data in the user buffer (write null data ... " << _outputNbChunk << " chunks)"); // just send no data ... return true; } DRAIN_INFO("Write " << _outputNbChunk << " chunks"); // check if we have enought data: - int32_t nbChunkToCopy = std::min(_inputNbChunk, m_tmpData.size()/(m_output.getMap().size()*m_formatSize)); + int32_t nbChunkToCopy = std::min(_inputNbChunk, m_buffer.getSize()); - DRAIN_INFO(" " << nbChunkToCopy << " chunks ==> " << nbChunkToCopy*m_output.getMap().size()*m_formatSize << " Byte sizeBuffer=" << m_tmpData.size()); + DRAIN_INFO(" " << nbChunkToCopy << " chunks ==> " << nbChunkToCopy*m_output.getMap().size()*m_formatSize << " Byte sizeBuffer=" << m_buffer.getSize()); // copy data to the output: - memcpy(_output, &m_tmpData[0], nbChunkToCopy*m_output.getMap().size()*m_formatSize); - // remove old data: - m_tmpData.erase(m_tmpData.begin(), m_tmpData.begin() + nbChunkToCopy*m_output.getMap().size()*m_formatSize); - //DRAIN_INFO(" nb Sample in buffer : " << m_tmpData.size()); + int32_t nbUnderflow = m_buffer.read(_output, nbChunkToCopy); + if (nbUnderflow != 0) { + DRAIN_WARNING("Undeflow in FIFO ..."); + } return true; } @@ -75,9 +96,50 @@ void drain::EndPointWrite::write(const void* _value, size_t _nbChunk) { DRAIN_INFO("[ASYNC] Write data : " << _nbChunk << " chunks" << " ==> " << _nbChunk*m_output.getMap().size() << " samples" << " formatSize=" << int32_t(m_formatSize) - << " bufferSize=" << m_tmpData.size()); - const int8_t* value = static_cast(_value); - for (size_t iii=0; iii<_nbChunk*m_formatSize*m_output.getMap().size(); ++iii) { - m_tmpData.push_back(*value++); + << " bufferSize=" << m_buffer.getSize()); + int32_t nbOverflow = m_buffer.write(_value, _nbChunk); + if (nbOverflow > 0) { + DRAIN_ERROR("Overflow in output buffer : " << nbOverflow << " / " << _nbChunk); } } + +void drain::EndPointWrite::setBufferSize(size_t _nbChunk) { + m_bufferSizeMicroseconds = std11::chrono::microseconds(0); + m_bufferSizeChunk = _nbChunk; + if ( audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size() != 0 + && m_output.getFrequency() != 0) { + m_buffer.setCapacity(_nbChunk, + audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size(), + float(m_output.getFrequency())); + } +} + +void drain::EndPointWrite::setBufferSize(const std11::chrono::microseconds& _time) { + m_bufferSizeMicroseconds = _time; + m_bufferSizeChunk = 0; + m_buffer.setCapacity(_time, + audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size(), + float(m_output.getFrequency())); +} + +size_t drain::EndPointWrite::getBufferSize() { + if (m_bufferSizeChunk != 0) { + return m_bufferSizeChunk; + } + return (int64_t(m_output.getFrequency())*m_bufferSizeMicroseconds.count())/1000000LL; +} + +std11::chrono::microseconds drain::EndPointWrite::getBufferSizeMicrosecond() { + if (m_bufferSizeMicroseconds != std11::chrono::microseconds(0) ) { + return m_bufferSizeMicroseconds; + } + return std11::chrono::microseconds(m_bufferSizeChunk*1000000LL/int64_t(m_output.getFrequency())); +} + +size_t drain::EndPointWrite::getBufferFillSize() { + return m_buffer.getSize()/(audio::getFormatBytes(m_output.getFormat())*m_output.getMap().size()); +} + +std11::chrono::microseconds drain::EndPointWrite::getBufferFillSizeMicrosecond() { + return std11::chrono::microseconds(getBufferFillSize()*1000000LL/int64_t(m_output.getFrequency())); +} diff --git a/drain/EndPointWrite.h b/drain/EndPointWrite.h index dbca436..e83c4b0 100644 --- a/drain/EndPointWrite.h +++ b/drain/EndPointWrite.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace drain{ typedef std11::function& _map)> playbackFunctionWrite; class EndPointWrite : public EndPoint { private: - std::vector m_tmpData; + drain::CircularBuffer m_buffer; playbackFunctionWrite m_function; std11::mutex m_mutex; protected: @@ -44,6 +45,40 @@ namespace drain{ virtual void setCallback(playbackFunctionWrite _function) { m_function = _function; } + protected: + std11::chrono::microseconds m_bufferSizeMicroseconds; // 0 if m_bufferSizeChunk != 0 + size_t m_bufferSizeChunk; // 0 if m_bufferSizeMicroseconds != 0 + public: + /** + * @brief Set buffer size in chunk number + * @param[in] _nbChunk Number of chunk in the buffer + */ + virtual void setBufferSize(size_t _nbChunk); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @param[in] _time Time in microsecond of the buffer + */ + virtual void setBufferSize(const std11::chrono::microseconds& _time); + /** + * @brief get buffer size in chunk number + * @return Number of chunk that can be written in the buffer + */ + virtual size_t getBufferSize(); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @return Time in microsecond that can be written in the buffer + */ + virtual std11::chrono::microseconds getBufferSizeMicrosecond(); + /** + * @brief Get buffer size filled in chunk number + * @return Number of chunk in the buffer (that might be read/write) + */ + virtual size_t getBufferFillSize(); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @return Time in microsecond of the buffer (that might be read/write) + */ + virtual std11::chrono::microseconds getBufferFillSizeMicrosecond(); }; }; diff --git a/drain/IOFormatInterface.cpp b/drain/IOFormatInterface.cpp index 596f837..7510e92 100644 --- a/drain/IOFormatInterface.cpp +++ b/drain/IOFormatInterface.cpp @@ -116,3 +116,8 @@ void drain::IOFormatInterface::configurationChange() { void drain::IOFormatInterface::setCallback(const std11::function& _functor) { m_ioChangeFunctor = _functor; } + + +int32_t drain::IOFormatInterface::getChunkSize() const { + return m_map.size() * audio::getFormatBytes(m_format); +} diff --git a/drain/IOFormatInterface.h b/drain/IOFormatInterface.h index 4099377..d9d3171 100644 --- a/drain/IOFormatInterface.h +++ b/drain/IOFormatInterface.h @@ -69,6 +69,12 @@ namespace drain{ * @param[in] _value New frequency. */ void setFrequency(float _value); + public: + /** + * @brief Get the Chunk size in byte. + * @return the number of byte used by chunk. + */ + int32_t getChunkSize() const; protected: std11::function m_ioChangeFunctor; //!< function pointer on the upper class void configurationChange(); diff --git a/lutin_drain.py b/lutin_drain.py index 8ff4be0..8d5f61b 100644 --- a/lutin_drain.py +++ b/lutin_drain.py @@ -15,6 +15,7 @@ def create(target): 'drain/airtalgo.cpp', 'drain/Algo.cpp', 'drain/ChannelReorder.cpp', + 'drain/CircularBuffer.cpp', 'drain/EndPointCallback.cpp', 'drain/EndPoint.cpp', 'drain/EndPointRead.cpp',