diff --git a/catkin/CMakeLists.txt b/catkin/CMakeLists.txt index bcfa806..6cb4bcb 100644 --- a/catkin/CMakeLists.txt +++ b/catkin/CMakeLists.txt @@ -46,7 +46,6 @@ add_library(${PROJECT_NAME} ../${PROJECT_NAME}/river.cpp ../${PROJECT_NAME}/Manager.cpp ../${PROJECT_NAME}/Interface.cpp - ../${PROJECT_NAME}/CircularBuffer.cpp ../${PROJECT_NAME}/io/Group.cpp ../${PROJECT_NAME}/io/Node.cpp ../${PROJECT_NAME}/io/NodeAirTAudio.cpp diff --git a/lutin_river.py b/lutin_river.py index 3505dea..e2645fd 100644 --- a/lutin_river.py +++ b/lutin_river.py @@ -15,7 +15,6 @@ def create(target): 'river/river.cpp', 'river/Manager.cpp', 'river/Interface.cpp', - 'river/CircularBuffer.cpp', 'river/io/Group.cpp', 'river/io/Node.cpp', 'river/io/NodeAirTAudio.cpp', diff --git a/river/CircularBuffer.cpp b/river/CircularBuffer.cpp deleted file mode 100644 index a24ef9f..0000000 --- a/river/CircularBuffer.cpp +++ /dev/null @@ -1,279 +0,0 @@ -/** @file - * @author Edouard DUPIN - * @copyright 2011, Edouard DUPIN, all right reserved - * @license APACHE v2.0 (see license file) - */ - -#include -#include - -river::CircularBuffer::CircularBuffer(const river::CircularBuffer& _obj) : - m_data(), - m_write(nullptr), - m_read(nullptr), - m_timeRead(), - m_capacity(0), - m_sizeChunk(0), - m_size(0) { - RIVER_CRITICAL("error"); -}; -/** - * @brief copy operator. - */ -river::CircularBuffer& river::CircularBuffer::operator=(const river::CircularBuffer& _obj) { - RIVER_CRITICAL("error"); - return *this; -}; - -river::CircularBuffer::CircularBuffer() : - m_data(), - m_write(nullptr), - m_read(nullptr), - m_timeRead(), - m_capacity(0), - m_sizeChunk(0), - m_size(0) { - // nothing to do ... -} - -river::CircularBuffer::~CircularBuffer() { - m_data.clear(); - m_read = nullptr; - m_write = nullptr; -} - -void river::CircularBuffer::setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency) { - if ( _chunkSize == m_sizeChunk - && _capacity == m_capacity) { - clear(); - return; - } - RIVER_DEBUG("buffer setCapacity(" << _capacity << "," << _chunkSize << ")"); - m_data.clear(); - m_write = nullptr; - m_read = nullptr; - m_frequency = _frequency; - m_capacity = _capacity; - m_sizeChunk = _chunkSize; - m_size = 0; - if ( _capacity == 0 - || _chunkSize == 0) { - m_capacity = 0; - m_sizeChunk = 0; - return; - } - m_data.resize(m_capacity*m_sizeChunk, 0); - m_read = &m_data[0]; - m_write = &m_data[0]; -} - -void river::CircularBuffer::setCapacity(std11::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency) { - uint32_t nbSampleNeeded = _frequency*_capacity.count()/1000; - RIVER_DEBUG("buffer setCapacity(" << _capacity.count() << "ms ," << _chunkSize << ")"); - setCapacity(nbSampleNeeded, _chunkSize, _frequency); -} - - -size_t river::CircularBuffer::getUsedSizeBeforEnd() const { - size_t size; - if (m_read < m_write) { - size = static_cast(m_write) - static_cast(m_read); - // the size result is in bytes we need to have it in element - size /= m_sizeChunk; - } else if ( m_read == m_write - && m_size == 0) { - // no element in the buffer - size = 0; - } else { - size = &m_data[0] + (m_capacity*m_sizeChunk) - static_cast(m_read); - // the size result is in bytes we need to have it in element - size /= m_sizeChunk; - } - return size; -} - -size_t river::CircularBuffer::getFreeSizeBeforEnd() const { - size_t size; - size = &m_data[0] - + (m_capacity*m_sizeChunk) - - static_cast(m_write); - // the size result is in Octet we need to have it in element - size /= m_sizeChunk; - return size; -} - -size_t river::CircularBuffer::write(const void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time) { - size_t nbElementDrop = 0; - size_t freeSizeBeforeEnd = getFreeSizeBeforEnd(); - size_t freeSize = m_capacity - m_size; - // Write element in all case - // calculate the number of element that are overwritten - if (freeSize < _nbChunk) { - nbElementDrop = _nbChunk - freeSize; - } - // if User Request a write more important than the size of the buffer ==> update the pointer to feet only on the buffer size - if (m_capacity < _nbChunk) { - RIVER_WARNING("CircularBuffer Write too BIG " << _nbChunk << " buffer max size : " << m_capacity << " (keep last Elements)"); - // Move data pointer - _data = static_cast(_data) + (_nbChunk - m_capacity) * m_sizeChunk; - // update size - _nbChunk = m_capacity; - } - // if no element in the FIFO ==> first time write or no more data inside ==> start set the file of the read data ... - if (m_size == 0) { - m_timeRead = _time; - } - // TODO : Check time to push continuous data ... - - if (freeSizeBeforeEnd >= _nbChunk) { - // all Data will be copy - memcpy(m_write, _data, _nbChunk * m_sizeChunk); - // update Writing pointer - if (freeSizeBeforeEnd == _nbChunk) { - // update to the end of FIFO ==> update to the start - m_write = &m_data[0]; - } else { - m_write = static_cast(m_write) + _nbChunk * m_sizeChunk; - } - // Update the number of element in the buffer. - m_size += _nbChunk; - } else { - // copy data to the end of buffer - memcpy(m_write, _data, freeSizeBeforeEnd * m_sizeChunk); - // update Writing pointer ==> end of buffer ==> go to the start - m_write = &m_data[0]; - // update data pointer - _data = static_cast(_data) + freeSizeBeforeEnd * m_sizeChunk; - m_size += freeSizeBeforeEnd; - // get the number of element we need to write - _nbChunk -= freeSizeBeforeEnd; - // Copy the las data if needed - if (_nbChunk != 0) { - memcpy(m_write, _data, _nbChunk * m_sizeChunk); - // update Writing pointer - m_write = static_cast(m_write) + _nbChunk * m_sizeChunk; - m_size += _nbChunk; - } - } - if (nbElementDrop > 0) { - // if drop element we need to update the reading pointer - m_read = m_write; - m_size = m_capacity; - } - // return the number of element Overwrite - return nbElementDrop; -} - -size_t river::CircularBuffer::read(void* _data, size_t _nbChunk) { - return read(_data, _nbChunk, m_timeRead); -} - -size_t river::CircularBuffer::read(void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time) { - size_t nbElementDrop = 0; - // Critical section (theoriquely protected by Mutex) - size_t usedSizeBeforeEnd = getUsedSizeBeforEnd(); - // verify if we have elements in the Buffer - if (0 < m_size) { - // check the time of the read : - std11::chrono::nanoseconds deltaTime = m_timeRead - _time; - if (deltaTime.count() == 0) { - // nothing to do ==> just copy data ... - } else if (deltaTime.count() > 0) { - // Add empty sample in the output buffer ... - size_t nbSampleEmpty = m_frequency*deltaTime.count()/100000000; - nbSampleEmpty = std::min(nbSampleEmpty, _nbChunk); - RIVER_WARNING("add Empty sample in the output buffer " << nbSampleEmpty << " / " << _nbChunk); - memset(_data, 0, nbSampleEmpty * m_sizeChunk); - if (nbSampleEmpty == _nbChunk) { - return 0; - } - _nbChunk -= nbSampleEmpty; - } else { - // Remove data from the FIFO - setReadPosition(_time); - } - if (m_size < _nbChunk) { - nbElementDrop = _nbChunk - m_size; - _nbChunk = m_size; - } - m_timeRead += std11::chrono::microseconds(_nbChunk*1000000/m_frequency); - if (usedSizeBeforeEnd >= _nbChunk) { - // all Data will be copy - memcpy(_data, m_read, _nbChunk * m_sizeChunk); - // update Writing pointer - m_read = static_cast(m_read) + _nbChunk * m_sizeChunk; - m_size -= _nbChunk; - // update output pointer in case of flush with 0 data - _data = static_cast(_data) + _nbChunk * m_sizeChunk; - } else { - // copy data to the end of buffer - memcpy(_data, m_read, usedSizeBeforeEnd * m_sizeChunk); - // update Writing pointer ==> end of buffer ==> go to the start - m_read = &m_data[0]; - _data = static_cast(_data) + usedSizeBeforeEnd * m_sizeChunk; - m_size -= usedSizeBeforeEnd; - // get the number of element we need to write - _nbChunk -= usedSizeBeforeEnd; - // Copy the last data if needed - if (0 != _nbChunk) { - memcpy(_data, m_read, _nbChunk * m_sizeChunk); - // update Writing pointer - m_read = static_cast(m_read) + _nbChunk * m_sizeChunk; - m_size -= _nbChunk; - // update output pointer in case of flush with 0 data - _data = static_cast(_data) + _nbChunk * m_sizeChunk; - } - } - } else { - nbElementDrop = _nbChunk; - } - if (0 < nbElementDrop) { - // set 0 in last element of the output - memset(_data, 0, m_sizeChunk * nbElementDrop); - } - // return the number of element droped - return nbElementDrop; -} - -void river::CircularBuffer::setReadPosition(const std11::chrono::system_clock::time_point& _time) { - // Critical section (theoriquely protected by Mutex) - size_t usedSizeBeforeEnd = getUsedSizeBeforEnd(); - if (0 < m_size) { - // check the time of the read : - std11::chrono::nanoseconds deltaTime = _time - m_timeRead; - size_t nbSampleToRemove = int64_t(m_frequency)*int64_t(deltaTime.count())/1000000000LL; - nbSampleToRemove = std::min(nbSampleToRemove, m_size); - RIVER_VERBOSE("Remove sample in the buffer " << nbSampleToRemove << " / " << m_size); - std11::chrono::nanoseconds updateTime((int64_t(nbSampleToRemove)*1000000000LL)/int64_t(m_frequency)); - RIVER_VERBOSE(" add time : " << updateTime.count() << "ns / " << deltaTime.count() << "ns"); - if (usedSizeBeforeEnd >= nbSampleToRemove) { - usedSizeBeforeEnd -= nbSampleToRemove; - m_size -= nbSampleToRemove; - m_read = static_cast(m_read) + nbSampleToRemove * m_sizeChunk; - } else { - nbSampleToRemove -= usedSizeBeforeEnd; - m_size -= nbSampleToRemove; - m_read = &m_data[0] + nbSampleToRemove*m_sizeChunk; - } - m_timeRead += updateTime; - //m_timeRead += deltaTime; - } else { - m_timeRead = std11::chrono::system_clock::time_point(); - } -} - - -size_t river::CircularBuffer::getFreeSize() const { - return m_capacity - m_size; -} - -void river::CircularBuffer::clear() { - RIVER_DEBUG("buffer clear()"); - // set pointer to the start - m_read = &m_data[0]; - m_write = &m_data[0]; - // Clean the number of element in the buffer - m_size = 0; - // Clean all element inside : - memset(&m_data[0], 0, m_sizeChunk * m_capacity); -} diff --git a/river/CircularBuffer.h b/river/CircularBuffer.h deleted file mode 100644 index 4d5aa0a..0000000 --- a/river/CircularBuffer.h +++ /dev/null @@ -1,139 +0,0 @@ -/** @file - * @author Edouard DUPIN - * @copyright 2011, Edouard DUPIN, all right reserved - * @license APACHE v2.0 (see license file) - */ - - -#ifndef __RIVER_CIRCULAR_BUFFER_H__ -#define __RIVER_CIRCULAR_BUFFER_H__ - -#include -#include -#include - -namespace river { - /** - * For these functions we have 4 solutions : - * - Free Buffer - * ---------------------------------------------------------- - * m_data | | | - * ---------------------------------------------------------- - * m_write - * m_read - * - Full Buffer - * ---------------------------------------------------------- - * m_data |****************************|***************************| - * ---------------------------------------------------------- - * m_write - * m_read - * - Buffer in used - * ---------------------------------------------------------- - * m_data | |********************| | - * ---------------------------------------------------------- - * m_read m_write - * - Buffer out used - * ---------------------------------------------------------- - * m_data |****************| |******************| - * ---------------------------------------------------------- - * m_read m_write - */ - class CircularBuffer { - private: - std::vector m_data; //!< data pointer - void* m_write; //!< write pointer - void* m_read; //!< read pointer - std11::chrono::system_clock::time_point m_timeRead; //!< current read time - uint32_t m_frequency; - // TODO : Remove the m_size ==> this is a bad element to be mutex-less - size_t m_size; //!< number of chunk availlable in this buffer - size_t m_capacity; //!< number of chunk available in this Buffer - size_t m_sizeChunk; //!< Size of one chunk (in byte) - public: - CircularBuffer(); - ~CircularBuffer(); - /** - * @brief copy contructor. - * @param[in] _obj Circular buffer object - */ - CircularBuffer(const river::CircularBuffer& _obj); - /** - * @brief copy operator. - * @param[in] _obj Circular buffer object - */ - CircularBuffer& operator=(const river::CircularBuffer& _obj); - /** - * @brief set the capacity of the circular buffer. - * @param[in] _capacity Number of chunk in the buffer. - * @param[in] _chunkSize Size of one chunk. - * @param[in] _frequency Frequency of the buffer - */ - void setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency); - /** - * @brief set the capacity of the circular buffer. - * @param[in] _capacity time in millisecond stored in the buffer. - * @param[in] _chunkSize Size of one chunk. - * @param[in] _frequency Frequency of the buffer - */ - void setCapacity(std11::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency); - /** - * @brief get free size of the buffer. - * @return Number of free chunk. - */ - size_t getFreeSize() const; - /** - * @brief Get number of chunk in the buffer. - * @return number of chunk. - */ - size_t getSize() const { - return m_size; - } - /** - * @brief Get number of chunk that can be set in the buffer. - * @return number of chunk. - */ - size_t getCapacity() const { - return m_capacity; - } - /** - * @brief Write chunk in the buffer. - * @param[in] _data Pointer on the data. - * @param[in] _nbChunk number of chunk to copy. - * @param[in] _time Time to start write data (if before end ==> not replace data, write only if after end) - * @return Number of chunk copied. - */ - size_t write(const void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time); - /** - * @brief Read Chunk from the buffer to the pointer data. - * @param[out] _data Pointer on the data. - * @param[in] _nbChunk number of chunk to copy. - * @param[in] _time Time to start read data (if before start ==> add 0 at start, if after, remove unread data) - * @return Number of chunk copied. - */ - size_t read(void* _data, size_t _nbChunk, const std11::chrono::system_clock::time_point& _time); - //! @previous - size_t read(void* _data, size_t _nbChunk); - void setReadPosition(const std11::chrono::system_clock::time_point& _time); - - std11::chrono::system_clock::time_point getReadTimeStamp() { - return m_timeRead; - } - /** - * @brief Clear the buffer. - */ - void clear(); - private: - /** - * @brief Get number of free chunks before end of buffer. - * @return Number of chunk. - */ - size_t getFreeSizeBeforEnd() const; - /** - * @brief Get number of used chunks before end of buffer. - * @return Number of chunk. - */ - size_t getUsedSizeBeforEnd() const; - }; -} - -#endif \ No newline at end of file diff --git a/river/Interface.cpp b/river/Interface.cpp index 168f5e8..71c8523 100644 --- a/river/Interface.cpp +++ b/river/Interface.cpp @@ -282,20 +282,123 @@ size_t river::Interface::size() const { return 0; } + + + + void river::Interface::setBufferSize(size_t _nbChunk) { std11::unique_lock lock(m_mutex); - m_process.updateInterAlgo(); - // TODO :... - + if (m_node->isInput() == true) { + std11::shared_ptr algo = m_process.get(m_process.size()-1); + if (algo == nullptr) { + RIVER_ERROR("Request set buffer size for Interface that is not READ or WRITE mode ..."); + return; + } + algo->setBufferSize(_nbChunk); + return; + } + std11::shared_ptr algo = m_process.get(0); + if (algo == nullptr) { + RIVER_ERROR("Request set buffer size for Interface that is not READ or WRITE mode ..."); + return; + } + algo->setBufferSize(_nbChunk); } void river::Interface::setBufferSize(const std11::chrono::microseconds& _time) { std11::unique_lock lock(m_mutex); - m_process.updateInterAlgo(); - // TODO :... + if (m_node->isInput() == true) { + std11::shared_ptr algo = m_process.get(m_process.size()-1); + if (algo == nullptr) { + RIVER_ERROR("Request set buffer size for Interface that is not READ or WRITE mode ..."); + return; + } + algo->setBufferSize(_time); + return; + } + std11::shared_ptr algo = m_process.get(0); + if (algo == nullptr) { + RIVER_ERROR("Request set buffer size for Interface that is not READ or WRITE mode ..."); + return; + } + algo->setBufferSize(_time); +} + +size_t river::Interface::getBufferSize() { + std11::unique_lock lock(m_mutex); + if (m_node->isInput() == true) { + std11::shared_ptr algo = m_process.get(m_process.size()-1); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return 0; + } + return algo->getBufferSize(); + } + std11::shared_ptr algo = m_process.get(0); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return 0; + } + return algo->getBufferSize(); +} + +std11::chrono::microseconds river::Interface::getBufferSizeMicrosecond() { + std11::unique_lock lock(m_mutex); + if (m_node->isInput() == true) { + std11::shared_ptr algo = m_process.get(m_process.size()-1); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return std11::chrono::microseconds(0); + } + return algo->getBufferSizeMicrosecond(); + } + std11::shared_ptr algo = m_process.get(0); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return std11::chrono::microseconds(0); + } + return algo->getBufferSizeMicrosecond(); +} + +size_t river::Interface::getBufferFillSize() { + std11::unique_lock lock(m_mutex); + if (m_node->isInput() == true) { + std11::shared_ptr algo = m_process.get(m_process.size()-1); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return 0; + } + return algo->getBufferFillSize(); + } + std11::shared_ptr algo = m_process.get(0); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return 0; + } + return algo->getBufferFillSize(); } +std11::chrono::microseconds river::Interface::getBufferFillSizeMicrosecond() { + std11::unique_lock lock(m_mutex); + if (m_node->isInput() == true) { + std11::shared_ptr algo = m_process.get(m_process.size()-1); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return std11::chrono::microseconds(0); + } + return algo->getBufferFillSizeMicrosecond(); + } + std11::shared_ptr algo = m_process.get(0); + if (algo == nullptr) { + RIVER_ERROR("Request get buffer size for Interface that is not READ or WRITE mode ..."); + return std11::chrono::microseconds(0); + } + return algo->getBufferFillSizeMicrosecond(); +} + + + void river::Interface::clearInternalBuffer() { std11::unique_lock lock(m_mutex); m_process.updateInterAlgo(); diff --git a/river/Interface.h b/river/Interface.h index 15f23b4..dc5e0b1 100644 --- a/river/Interface.h +++ b/river/Interface.h @@ -189,10 +189,30 @@ namespace river { */ virtual void setBufferSize(size_t _nbChunk); /** - * @brief Set buffer size in chunk number - * @param[in] _nbChunk Number of chunk in the buffer + * @brief Set buffer size size of the buffer with the stored time in µs + * @param[in] _time Time in microsecond of the buffer */ virtual void setBufferSize(const std11::chrono::microseconds& _time); + /** + * @brief get buffer size in chunk number + * @return Number of chunk that can be written in the buffer + */ + virtual size_t getBufferSize(); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @return Time in microsecond that can be written in the buffer + */ + virtual std11::chrono::microseconds getBufferSizeMicrosecond(); + /** + * @brief Get buffer size filled in chunk number + * @return Number of chunk in the buffer (that might be read/write) + */ + virtual size_t getBufferFillSize(); + /** + * @brief Set buffer size size of the buffer with the stored time in µs + * @return Time in microsecond of the buffer (that might be read/write) + */ + virtual std11::chrono::microseconds getBufferFillSizeMicrosecond(); /** * @brief Remove internal Buffer */ diff --git a/river/io/Node.cpp b/river/io/Node.cpp index b0a8c17..81cbfeb 100644 --- a/river/io/Node.cpp +++ b/river/io/Node.cpp @@ -139,6 +139,7 @@ void river::io::Node::registerAsRemote(const std11::shared_ptr while (it != m_listAvaillable.end()) { if (it->expired() == true) { it = m_listAvaillable.erase(it); + continue; } ++it; } diff --git a/river/io/NodeAEC.h b/river/io/NodeAEC.h index 3c762fb..55ba9ff 100644 --- a/river/io/NodeAEC.h +++ b/river/io/NodeAEC.h @@ -9,7 +9,7 @@ #include #include -#include +#include namespace river { namespace io { @@ -48,8 +48,8 @@ namespace river { enum audio::format _format, uint32_t _frequency, const std::vector& _map); - river::CircularBuffer m_bufferMicrophone; - river::CircularBuffer m_bufferFeedBack; + drain::CircularBuffer m_bufferMicrophone; + drain::CircularBuffer m_bufferFeedBack; std11::chrono::nanoseconds m_sampleTime; //!< represent the sample time at the specify frequency. void process(); void processAEC(void* _dataMic, void* _dataFB, uint32_t _nbChunk, const std11::chrono::system_clock::time_point& _time); diff --git a/river/io/NodeMuxer.h b/river/io/NodeMuxer.h index 7ff019e..c34671f 100644 --- a/river/io/NodeMuxer.h +++ b/river/io/NodeMuxer.h @@ -9,7 +9,7 @@ #include #include -#include +#include namespace river { namespace io { @@ -50,8 +50,8 @@ namespace river { const std::vector& _map); std::vector m_mapInput1; std::vector m_mapInput2; - river::CircularBuffer m_bufferInput1; - river::CircularBuffer m_bufferInput2; + drain::CircularBuffer m_bufferInput1; + drain::CircularBuffer m_bufferInput2; std11::chrono::nanoseconds m_sampleTime; //!< represent the sample time at the specify frequency. void process(); void processMuxer(void* _dataMic, void* _dataFB, uint32_t _nbChunk, const std11::chrono::system_clock::time_point& _time); diff --git a/test/catkin/CMakeLists.txt b/test/catkin/CMakeLists.txt index 8a0f941..0983495 100644 --- a/test/catkin/CMakeLists.txt +++ b/test/catkin/CMakeLists.txt @@ -50,7 +50,7 @@ target_link_libraries(${PROJECT_NAME} ############# ## Mark executables and/or libraries for installation -install(TARGETS ${PROJECT_NAME} - RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} -) +#install(TARGETS ${PROJECT_NAME} +# RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION} +#)