diff --git a/lutin_river.py b/lutin_river.py index 30ad581..0468d23 100644 --- a/lutin_river.py +++ b/lutin_river.py @@ -14,6 +14,7 @@ def create(target): 'river/debug.cpp', 'river/Manager.cpp', 'river/Interface.cpp', + 'river/CircularBuffer.cpp', 'river/io/Node.cpp', 'river/io/NodeAirTAudio.cpp', 'river/io/NodeAEC.cpp', diff --git a/river/CircularBuffer.cpp b/river/CircularBuffer.cpp new file mode 100644 index 0000000..37decca --- /dev/null +++ b/river/CircularBuffer.cpp @@ -0,0 +1,274 @@ +/** @file + * @author Edouard DUPIN + * @copyright 2011, Edouard DUPIN, all right reserved + * @license APACHE v2.0 (see license file) + */ + +#include +#include + +river::CircularBuffer::CircularBuffer(const river::CircularBuffer& _obj) : + m_data(), + m_write(nullptr), + m_read(nullptr), + m_timeRead(), + m_capacity(0), + m_sizeChunk(0), + m_size(0) { + RIVER_CRITICAL("error"); +}; +/** + * @brief copy operator. + */ +river::CircularBuffer& river::CircularBuffer::operator=(const river::CircularBuffer& _obj) { + RIVER_CRITICAL("error"); + return *this; +}; + +river::CircularBuffer::CircularBuffer() : + m_data(), + m_write(nullptr), + m_read(nullptr), + m_timeRead(), + m_capacity(0), + m_sizeChunk(0), + m_size(0) { + // nothing to do ... +} + +river::CircularBuffer::~CircularBuffer() { + m_data.clear(); + m_read = nullptr; + m_write = nullptr; +} + +void river::CircularBuffer::setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency) { + if ( _chunkSize == m_sizeChunk + && _capacity == m_capacity) { + clear(); + return; + } + RIVER_DEBUG("buffer setCapacity(" << _capacity << "," << _chunkSize << ")"); + m_data.clear(); + m_write = nullptr; + m_read = nullptr; + m_frequency = _frequency; + m_capacity = _capacity; + m_sizeChunk = _chunkSize; + m_size = 0; + if ( _capacity == 0 + || _chunkSize == 0) { + m_capacity = 0; + m_sizeChunk = 0; + return; + } + m_data.resize(m_capacity*m_sizeChunk, 0); + m_read = &m_data[0]; + m_write = &m_data[0]; +} + +void river::CircularBuffer::setCapacity(std::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency) { + uint32_t nbSampleNeeded = _frequency*_capacity.count()/1000; + RIVER_DEBUG("buffer setCapacity(" << _capacity.count() << "ms ," << _chunkSize << ")"); + setCapacity(nbSampleNeeded, _chunkSize, _frequency); +} + + +size_t river::CircularBuffer::getUsedSizeBeforEnd() const { + size_t size; + if (m_read < m_write) { + size = static_cast(m_write) - static_cast(m_read); + // the size result is in bytes we need to have it in element + size /= m_sizeChunk; + } else if ( m_read == m_write + && m_size == 0) { + // no element in the buffer + size = 0; + } else { + size = &m_data[0] + (m_capacity*m_sizeChunk) - static_cast(m_read); + // the size result is in bytes we need to have it in element + size /= m_sizeChunk; + } + return size; +} + +size_t river::CircularBuffer::getFreeSizeBeforEnd() const { + size_t size; + size = &m_data[0] + + (m_capacity*m_sizeChunk) + - static_cast(m_write); + // the size result is in Octet we need to have it in element + size /= m_sizeChunk; + return size; +} + +size_t river::CircularBuffer::write(const void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time) { + size_t nbElementDrop = 0; + size_t freeSizeBeforeEnd = getFreeSizeBeforEnd(); + size_t freeSize = m_capacity - m_size; + // Write element in all case + // calculate the number of element that are overwritten + if (freeSize < _nbChunk) { + nbElementDrop = _nbChunk - freeSize; + } + // if User Request a write more important than the size of the buffer ==> update the pointer to feet only on the buffer size + if (m_capacity < _nbChunk) { + RIVER_WARNING("CircularBuffer Write too BIG " << _nbChunk << " buffer max size : " << m_capacity << " (keep last Elements)"); + // Move data pointer + _data = static_cast(_data) + (_nbChunk - m_capacity) * m_sizeChunk; + // update size + _nbChunk = m_capacity; + } + // if no element in the FIFO ==> first time write or no more data inside ==> start set the file of the read data ... + if (m_size == 0) { + m_timeRead = _time; + } + // TODO : Check time to push continuous data ... + + if (freeSizeBeforeEnd >= _nbChunk) { + // all Data will be copy + memcpy(m_write, _data, _nbChunk * m_sizeChunk); + // update Writing pointer + if (freeSizeBeforeEnd == _nbChunk) { + // update to the end of FIFO ==> update to the start + m_write = &m_data[0]; + } else { + m_write = static_cast(m_write) + _nbChunk * m_sizeChunk; + } + // Update the number of element in the buffer. + m_size += _nbChunk; + } else { + // copy data to the end of buffer + memcpy(m_write, _data, freeSizeBeforeEnd * m_sizeChunk); + // update Writing pointer ==> end of buffer ==> go to the start + m_write = &m_data[0]; + // update data pointer + _data = static_cast(_data) + freeSizeBeforeEnd * m_sizeChunk; + m_size += freeSizeBeforeEnd; + // get the number of element we need to write + _nbChunk -= freeSizeBeforeEnd; + // Copy the las data if needed + if (_nbChunk != 0) { + memcpy(m_write, _data, _nbChunk * m_sizeChunk); + // update Writing pointer + m_write = static_cast(m_write) + _nbChunk * m_sizeChunk; + m_size += _nbChunk; + } + } + if (nbElementDrop > 0) { + // if drop element we need to update the reading pointer + m_read = m_write; + m_size = m_capacity; + } + // return the number of element Overwrite + return nbElementDrop; +} + +size_t river::CircularBuffer::read(void* _data, size_t _nbChunk) { + return read(_data, _nbChunk, m_timeRead); +} + +size_t river::CircularBuffer::read(void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time) { + size_t nbElementDrop = 0; + // Critical section (theoriquely protected by Mutex) + size_t usedSizeBeforeEnd = getUsedSizeBeforEnd(); + // verify if we have elements in the Buffer + if (0 < m_size) { + // check the time of the read : + std::chrono::nanoseconds deltaTime = m_timeRead - _time; + if (deltaTime.count() == 0) { + // nothing to do ==> just copy data ... + } else if (deltaTime.count() > 0) { + // Add empty sample in the output buffer ... + size_t nbSampleEmpty = m_frequency*deltaTime.count()/100000000; + nbSampleEmpty = std::min(nbSampleEmpty, _nbChunk); + RIVER_WARNING("add Empty sample in the output buffer " << nbSampleEmpty << " / " << _nbChunk); + memset(_data, 0, nbSampleEmpty * m_sizeChunk); + if (nbSampleEmpty == _nbChunk) { + return 0; + } + _nbChunk -= nbSampleEmpty; + } else { + // Remove data from the FIFO + setReadPosition(_time); + } + if (m_size < _nbChunk) { + nbElementDrop = _nbChunk - m_size; + _nbChunk = m_size; + } + m_timeRead += std::chrono::microseconds(_nbChunk*1000000/m_frequency); + if (usedSizeBeforeEnd >= _nbChunk) { + // all Data will be copy + memcpy(_data, m_read, _nbChunk * m_sizeChunk); + // update Writing pointer + m_read = static_cast(m_read) + _nbChunk * m_sizeChunk; + m_size -= _nbChunk; + // update output pointer in case of flush with 0 data + _data = static_cast(_data) + _nbChunk * m_sizeChunk; + } else { + // copy data to the end of buffer + memcpy(_data, m_read, usedSizeBeforeEnd * m_sizeChunk); + // update Writing pointer ==> end of buffer ==> go to the start + m_read = &m_data[0]; + _data = static_cast(_data) + usedSizeBeforeEnd * m_sizeChunk; + m_size -= usedSizeBeforeEnd; + // get the number of element we need to write + _nbChunk -= usedSizeBeforeEnd; + // Copy the last data if needed + if (0 != _nbChunk) { + memcpy(_data, m_read, _nbChunk * m_sizeChunk); + // update Writing pointer + m_read = static_cast(m_read) + _nbChunk * m_sizeChunk; + m_size -= _nbChunk; + // update output pointer in case of flush with 0 data + _data = static_cast(_data) + _nbChunk * m_sizeChunk; + } + } + } else { + nbElementDrop = _nbChunk; + } + if (0 < nbElementDrop) { + // set 0 in last element of the output + memset(_data, 0, m_sizeChunk * nbElementDrop); + } + // return the number of element droped + return nbElementDrop; +} +void river::CircularBuffer::setReadPosition(const std::chrono::system_clock::time_point& _time) { + // Critical section (theoriquely protected by Mutex) + size_t usedSizeBeforeEnd = getUsedSizeBeforEnd(); + if (0 < m_size) { + // check the time of the read : + std::chrono::nanoseconds deltaTime = m_timeRead - _time; + size_t nbSampleToRemove = m_frequency*-deltaTime.count()/100000000; + nbSampleToRemove = std::min(nbSampleToRemove, m_size); + RIVER_WARNING("Remove sample in the buffer " << nbSampleToRemove << " / " << m_size); + if (usedSizeBeforeEnd >= nbSampleToRemove) { + usedSizeBeforeEnd -= nbSampleToRemove; + m_size -= nbSampleToRemove; + m_read = static_cast(m_read) + nbSampleToRemove * m_sizeChunk; + } else { + nbSampleToRemove -= usedSizeBeforeEnd; + m_size -= nbSampleToRemove; + m_read = &m_data[0] + nbSampleToRemove*m_sizeChunk; + } + m_timeRead += deltaTime; + } else { + m_timeRead = std::chrono::system_clock::time_point(); + } +} + +size_t river::CircularBuffer::getFreeSize() const { + return m_capacity - m_size; +} + +void river::CircularBuffer::clear() { + RIVER_DEBUG("buffer clear()"); + // set pointer to the start + m_read = &m_data[0]; + m_write = &m_data[0]; + // Clean the number of element in the buffer + m_size = 0; + // Clean all element inside : + memset(&m_data[0], 0, m_sizeChunk * m_capacity); +} diff --git a/river/CircularBuffer.h b/river/CircularBuffer.h new file mode 100644 index 0000000..286d434 --- /dev/null +++ b/river/CircularBuffer.h @@ -0,0 +1,139 @@ +/** @file + * @author Edouard DUPIN + * @copyright 2011, Edouard DUPIN, all right reserved + * @license APACHE v2.0 (see license file) + */ + + +#ifndef __RIVER_CIRCULAR_BUFFER_H__ +#define __RIVER_CIRCULAR_BUFFER_H__ + +#include +#include +#include + +namespace river { + /** + * For these functions we have 4 solutions : + * - Free Buffer + * ---------------------------------------------------------- + * m_data | | | + * ---------------------------------------------------------- + * m_write + * m_read + * - Full Buffer + * ---------------------------------------------------------- + * m_data |****************************|***************************| + * ---------------------------------------------------------- + * m_write + * m_read + * - Buffer in used + * ---------------------------------------------------------- + * m_data | |********************| | + * ---------------------------------------------------------- + * m_read m_write + * - Buffer out used + * ---------------------------------------------------------- + * m_data |****************| |******************| + * ---------------------------------------------------------- + * m_read m_write + */ + class CircularBuffer { + private: + std::vector m_data; //!< data pointer + void* m_write; //!< write pointer + void* m_read; //!< read pointer + std::chrono::system_clock::time_point m_timeRead; //!< current read time + uint32_t m_frequency; + // TODO : Remove the m_size ==> this is a bad element to be mutex-less + size_t m_size; //!< number of chunk availlable in this buffer + size_t m_capacity; //!< number of chunk available in this Buffer + size_t m_sizeChunk; //!< Size of one chunk (in byte) + public: + CircularBuffer(); + ~CircularBuffer(); + /** + * @brief copy contructor. + * @param[in] _obj Circular buffer object + */ + CircularBuffer(const river::CircularBuffer& _obj); + /** + * @brief copy operator. + * @param[in] _obj Circular buffer object + */ + CircularBuffer& operator=(const river::CircularBuffer& _obj); + /** + * @brief set the capacity of the circular buffer. + * @param[in] _capacity Number of chunk in the buffer. + * @param[in] _chunkSize Size of one chunk. + * @param[in] _frequency Frequency of the buffer + */ + void setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency); + /** + * @brief set the capacity of the circular buffer. + * @param[in] _capacity time in millisecond stored in the buffer. + * @param[in] _chunkSize Size of one chunk. + * @param[in] _frequency Frequency of the buffer + */ + void setCapacity(std::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency); + /** + * @brief get free size of the buffer. + * @return Number of free chunk. + */ + size_t getFreeSize() const; + /** + * @brief Get number of chunk in the buffer. + * @return number of chunk. + */ + size_t getSize() const { + return m_size; + } + /** + * @brief Get number of chunk that can be set in the buffer. + * @return number of chunk. + */ + size_t getCapacity() const { + return m_capacity; + } + /** + * @brief Write chunk in the buffer. + * @param[in] _data Pointer on the data. + * @param[in] _nbChunk number of chunk to copy. + * @param[in] _time Time to start write data (if before end ==> not replace data, write only if after end) + * @return Number of chunk copied. + */ + size_t write(const void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time); + /** + * @brief Read Chunk from the buffer to the pointer data. + * @param[out] _data Pointer on the data. + * @param[in] _nbChunk number of chunk to copy. + * @param[in] _time Time to start read data (if before start ==> add 0 at start, if after, remove unread data) + * @return Number of chunk copied. + */ + size_t read(void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time); + //! @previous + size_t read(void* _data, size_t _nbChunk); + void setReadPosition(const std::chrono::system_clock::time_point& _time); + + std::chrono::system_clock::time_point getReadTimeStamp() { + return m_timeRead; + } + /** + * @brief Clear the buffer. + */ + void clear(); + private: + /** + * @brief Get number of free chunks before end of buffer. + * @return Number of chunk. + */ + size_t getFreeSizeBeforEnd() const; + /** + * @brief Get number of used chunks before end of buffer. + * @return Number of chunk. + */ + size_t getUsedSizeBeforEnd() const; + }; +} + +#endif \ No newline at end of file diff --git a/river/Interface.cpp b/river/Interface.cpp index ac997ac..c1aebf8 100644 --- a/river/Interface.cpp +++ b/river/Interface.cpp @@ -45,7 +45,8 @@ bool river::Interface::init(const std::string& _name, // register interface to be notify from the volume change. m_node->registerAsRemote(shared_from_this()); // Create convertion interface - if (m_node->isInput() == true) { + if ( m_node->isInput() == true + && m_mode == river::modeInterface_input) { m_process.setInputConfig(m_node->getInterfaceFormat()); // add all time the volume stage : std::shared_ptr algo = drain::Volume::create(); @@ -59,7 +60,8 @@ bool river::Interface::init(const std::string& _name, algo->addVolumeStage(tmpVolume); } m_process.setOutputConfig(drain::IOFormatInterface(_map, _format, _freq)); - } else { + } else if ( m_node->isOutput() == true + && m_mode == river::modeInterface_output) { m_process.setInputConfig(drain::IOFormatInterface(_map, _format, _freq)); // add all time the volume stage : std::shared_ptr algo = drain::Volume::create(); @@ -73,6 +75,19 @@ bool river::Interface::init(const std::string& _name, algo->addVolumeStage(tmpVolume); } m_process.setOutputConfig(m_node->getInterfaceFormat()); + } else if ( m_node->isOutput() == true + && m_mode == river::modeInterface_feedback) { + m_process.setInputConfig(m_node->getHarwareFormat()); + // add all time the volume stage : + std::shared_ptr algo = drain::Volume::create(); + //algo->setInputFormat(m_node->getInterfaceFormat()); + algo->setName("volume"); + m_process.pushBack(algo); + // note : feedback has no volume stage ... + m_process.setOutputConfig(drain::IOFormatInterface(_map, _format, _freq)); + } else { + RIVER_ERROR("Can not link virtual interface with type : " << m_mode << " to a hardware interface " << (m_node->isInput()==true?"input":"output")); + return false; } return true; } @@ -116,7 +131,7 @@ void river::Interface::setReadwrite() { } } -void river::Interface::setOutputCallback(size_t _chunkSize, drain::needDataFunction _function) { +void river::Interface::setOutputCallback(drain::playbackFunction _function) { std::unique_lock lock(m_mutex); m_process.removeAlgoDynamic(); m_process.removeIfFirst(); @@ -124,7 +139,7 @@ void river::Interface::setOutputCallback(size_t _chunkSize, drain::needDataFunct m_process.pushFront(algo); } -void river::Interface::setInputCallback(size_t _chunkSize, drain::haveNewDataFunction _function) { +void river::Interface::setInputCallback(drain::recordFunction _function) { std::unique_lock lock(m_mutex); m_process.removeAlgoDynamic(); m_process.removeIfLast(); @@ -132,7 +147,7 @@ void river::Interface::setInputCallback(size_t _chunkSize, drain::haveNewDataFun m_process.pushBack(algo); } -void river::Interface::setWriteCallback(drain::needDataFunctionWrite _function) { +void river::Interface::setWriteCallback(drain::playbackFunctionWrite _function) { std::unique_lock lock(m_mutex); m_process.removeAlgoDynamic(); std::shared_ptr algo = m_process.get(0); diff --git a/river/Interface.h b/river/Interface.h index 2932968..a80a471 100644 --- a/river/Interface.h +++ b/river/Interface.h @@ -96,9 +96,9 @@ namespace river { /** * @brief When we want to implement a Callback Mode: */ - virtual void setWriteCallback(drain::needDataFunctionWrite _function); - virtual void setOutputCallback(size_t _chunkSize, drain::needDataFunction _function); - virtual void setInputCallback(size_t _chunkSize, drain::haveNewDataFunction _function); + virtual void setWriteCallback(drain::playbackFunctionWrite _function); + virtual void setOutputCallback(drain::playbackFunction _function); + virtual void setInputCallback(drain::recordFunction _function); /** * @brief Add a volume group of the current channel. * @note If you do not call this function with the group "FLOW" you chan not have a channel volume. diff --git a/river/io/Node.h b/river/io/Node.h index c97525f..d622b27 100644 --- a/river/io/Node.h +++ b/river/io/Node.h @@ -50,7 +50,6 @@ namespace river { return m_process.getInputConfig(); } } - protected: const drain::IOFormatInterface& getHarwareFormat() { if (m_isInput == true) { return m_process.getInputConfig(); diff --git a/river/io/NodeAEC.cpp b/river/io/NodeAEC.cpp index a417e49..15f3cfe 100644 --- a/river/io/NodeAEC.cpp +++ b/river/io/NodeAEC.cpp @@ -149,6 +149,7 @@ river::io::NodeAEC::NodeAEC(const std::string& _name, const std::shared_ptr feedbackMap; feedbackMap.push_back(audio::channel_frontCenter); + RIVER_INFO("Create FEEDBACK : "); m_interfaceFeedBack = createInput(hardwareFormat.getFrequency(), feedbackMap, hardwareFormat.getFormat(), @@ -158,6 +159,7 @@ river::io::NodeAEC::NodeAEC(const std::string& _name, const std::shared_ptrsetInputCallback(1024, - std::bind(&river::io::NodeAEC::onDataReceivedFeedBack, + m_interfaceFeedBack->setInputCallback(std::bind(&river::io::NodeAEC::onDataReceivedFeedBack, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5, + std::placeholders::_6)); // set callback mode ... - m_interfaceMicrophone->setInputCallback(1024, - std::bind(&river::io::NodeAEC::onDataReceivedMicrophone, + m_interfaceMicrophone->setInputCallback(std::bind(&river::io::NodeAEC::onDataReceivedMicrophone, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5, + std::placeholders::_6)); + + m_bufferMicrophone.setCapacity(std::chrono::milliseconds(1000), + audio::getFormatBytes(hardwareFormat.getFormat())*hardwareFormat.getMap().size(), + hardwareFormat.getFrequency()); + m_bufferFeedBack.setCapacity(std::chrono::milliseconds(1000), + audio::getFormatBytes(hardwareFormat.getFormat()), // only one channel ... + hardwareFormat.getFrequency()); m_process.updateInterAlgo(); } @@ -201,9 +210,11 @@ void river::io::NodeAEC::start() { std::unique_lock lock(m_mutex); RIVER_INFO("Start stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") ); if (m_interfaceFeedBack != nullptr) { + RIVER_INFO("Start FEEDBACK : "); m_interfaceFeedBack->start(); } if (m_interfaceMicrophone != nullptr) { + RIVER_INFO("Start Microphone : "); m_interfaceMicrophone->start(); } } @@ -220,35 +231,140 @@ void river::io::NodeAEC::stop() { namespace std { static std::ostream& operator <<(std::ostream& _os, const std::chrono::system_clock::time_point& _obj) { - std::chrono::microseconds us = std::chrono::duration_cast(_obj.time_since_epoch()); - _os << us.count(); + std::chrono::nanoseconds ns = std::chrono::duration_cast(_obj.time_since_epoch()); + int64_t totalSecond = ns.count()/1000000000; + int64_t millisecond = (ns.count()%1000000000)/1000000; + int64_t microsecond = (ns.count()%1000000)/1000; + int64_t nanosecond = ns.count()%1000; + //_os << totalSecond << "s " << millisecond << "ms " << microsecond << "µs " << nanosecond << "ns"; + + int32_t second = totalSecond % 60; + int32_t minute = (totalSecond/60)%60; + int32_t hour = (totalSecond/3600)%24; + int32_t day = (totalSecond/(24*3600))%365; + int32_t year = totalSecond/(24*3600*365); + _os << year << "y " << day << "d " << hour << "h" << minute << ":"<< second << "s " << millisecond << "ms " << microsecond << "µs " << nanosecond << "ns"; return _os; } } +#define SAVE_FILE_MACRO(type,fileName,dataPointer,nbElement) \ + do { \ + static FILE *pointerOnFile = nullptr; \ + static bool errorOpen = false; \ + if (NULL==pointerOnFile) { \ + RIVER_WARNING("open file '" << fileName << "' type=" << #type); \ + pointerOnFile = fopen(fileName,"w"); \ + if ( errorOpen == false \ + && pointerOnFile == nullptr) { \ + RIVER_ERROR("ERROR OPEN file ... '" << fileName << "' type=" << #type); \ + errorOpen=true; \ + } \ + } \ + if (pointerOnFile != nullptr) { \ + fwrite((dataPointer), sizeof(type), (nbElement), pointerOnFile); \ + fflush(pointerOnFile); \ + } \ + }while(0) -void river::io::NodeAEC::onDataReceivedMicrophone(const std::chrono::system_clock::time_point& _time, + +void river::io::NodeAEC::onDataReceivedMicrophone(const void* _data, + const std::chrono::system_clock::time_point& _time, size_t _nbChunk, - const std::vector& _map, - const void* _data, - enum audio::format _type) { - RIVER_INFO("Microphone Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _type=" << _type); - if (_type != audio::format_int16) { + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { + RIVER_DEBUG("Microphone Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _format=" << _format << " freq=" << _frequency); + if (_format != audio::format_int16) { RIVER_ERROR("call wrong type ... (need int16_t)"); } // push data synchronize - - // if threaded : send event / otherwise, process ... - newInput(_data, _nbChunk, _time); + std::unique_lock lock(m_mutex); + m_bufferMicrophone.write(_data, _nbChunk, _time); + SAVE_FILE_MACRO(int16_t, "REC_Microphone.raw", _data, _nbChunk*_map.size()); + process(); } -void river::io::NodeAEC::onDataReceivedFeedBack(const std::chrono::system_clock::time_point& _time, +void river::io::NodeAEC::onDataReceivedFeedBack(const void* _data, + const std::chrono::system_clock::time_point& _time, size_t _nbChunk, - const std::vector& _map, - const void* _data, - enum audio::format _type) { - RIVER_INFO("FeedBack Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _type=" << _type); - if (_type != audio::format_int16) { + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { + RIVER_DEBUG("FeedBack Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _format=" << _format << " freq=" << _frequency); + if (_format != audio::format_int16) { RIVER_ERROR("call wrong type ... (need int16_t)"); } - // TODO : Call synchro ... + // push data synchronize + std::unique_lock lock(m_mutex); + m_bufferFeedBack.write(_data, _nbChunk, _time); + SAVE_FILE_MACRO(int16_t, "REC_FeedBack.raw", _data, _nbChunk*_map.size()); + process(); } + +void river::io::NodeAEC::process() { + if (m_bufferMicrophone.getSize() <= 256) { + return; + } + if (m_bufferFeedBack.getSize() <= 256) { + return; + } + std::chrono::system_clock::time_point MicTime = m_bufferMicrophone.getReadTimeStamp(); + std::chrono::system_clock::time_point fbTime = m_bufferFeedBack.getReadTimeStamp(); + + // Synchronize if possible + if (MicTime < fbTime) { + RIVER_INFO("micTime < fbTime : Change Microphone time start " << fbTime); + RIVER_INFO(" old time stamp=" << m_bufferMicrophone.getReadTimeStamp()); + m_bufferMicrophone.setReadPosition(fbTime); + RIVER_INFO(" new time stamp=" << m_bufferMicrophone.getReadTimeStamp()); + } + /* + if (MicTime > fbTime) { + RIVER_INFO("micTime > fbTime : Change FeedBack time start " << fbTime); + RIVER_INFO(" old time stamp=" << m_bufferFeedBack.getReadTimeStamp()); + m_bufferFeedBack.setReadPosition(MicTime); + RIVER_INFO(" new time stamp=" << m_bufferFeedBack.getReadTimeStamp()); + }*/ + // check if enought time after synchronisation ... + if (m_bufferMicrophone.getSize() <= 256) { + return; + } + if (m_bufferFeedBack.getSize() <= 256) { + return; + } + + MicTime = m_bufferMicrophone.getReadTimeStamp(); + fbTime = m_bufferFeedBack.getReadTimeStamp(); + + if (MicTime != fbTime) { + RIVER_ERROR("Can not synchronize flow ... : " << MicTime << " != " << fbTime << " delta = " << (MicTime-fbTime).count()/1000 << " µs"); + return; + } + std::vector dataMic; + std::vector dataFB; + dataMic.resize(256*sizeof(int16_t)*2, 0); + dataFB.resize(256*sizeof(int16_t), 0); + while (true) { + MicTime = m_bufferMicrophone.getReadTimeStamp(); + fbTime = m_bufferFeedBack.getReadTimeStamp(); + RIVER_INFO(" process 256 samples ... " << MicTime); + m_bufferMicrophone.read(&dataMic[0], 256); + m_bufferFeedBack.read(&dataFB[0], 256); + SAVE_FILE_MACRO(int16_t, "REC_Microphone_sync.raw", &dataMic[0], 256*2); + SAVE_FILE_MACRO(int16_t, "REC_FeedBack_sync.raw", &dataFB[0], 256); + // if threaded : send event / otherwise, process ... + //processAEC(&dataMic[0], &dataFB[0], 256, _time); + if (m_bufferMicrophone.getSize() <= 256) { + return; + } + if (m_bufferFeedBack.getSize() <= 256) { + return; + } + } +} + + +void river::io::NodeAEC::processAEC(void* _dataMic, void* _dataFB, uint32_t _nbChunk, const std::chrono::system_clock::time_point& _time) { + newInput(_dataMic, _nbChunk, _time); +} + diff --git a/river/io/NodeAEC.h b/river/io/NodeAEC.h index 84144af..1d6d562 100644 --- a/river/io/NodeAEC.h +++ b/river/io/NodeAEC.h @@ -9,6 +9,7 @@ #include #include +#include namespace river { namespace io { @@ -35,17 +36,22 @@ namespace river { audio::format _format, const std::string& _streamName, const std::string& _name); - void onDataReceivedMicrophone(const std::chrono::system_clock::time_point& _playTime, + void onDataReceivedMicrophone(const void* _data, + const std::chrono::system_clock::time_point& _time, size_t _nbChunk, - const std::vector& _map, - const void* _data, - enum audio::format _type); - - void onDataReceivedFeedBack(const std::chrono::system_clock::time_point& _readTime, + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map); + void onDataReceivedFeedBack(const void* _data, + const std::chrono::system_clock::time_point& _time, size_t _nbChunk, - const std::vector& _map, - const void* _data, - enum audio::format _type); + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map); + river::CircularBuffer m_bufferMicrophone; + river::CircularBuffer m_bufferFeedBack; + void process(); + void processAEC(void* _dataMic, void* _dataFB, uint32_t _nbChunk, const std::chrono::system_clock::time_point& _time); }; } } diff --git a/river/io/NodeAirTAudio.cpp b/river/io/NodeAirTAudio.cpp index 9cbe4da..7caf01a 100644 --- a/river/io/NodeAirTAudio.cpp +++ b/river/io/NodeAirTAudio.cpp @@ -31,23 +31,48 @@ namespace std { } -int32_t river::io::NodeAirTAudio::airtAudioCallback(void* _outputBuffer, - void* _inputBuffer, - uint32_t _nbChunk, - const std::chrono::system_clock::time_point& _time, - airtaudio::status _status) { +int32_t river::io::NodeAirTAudio::duplexCallback(const void* _inputBuffer, + const std::chrono::system_clock::time_point& _timeInput, + void* _outputBuffer, + const std::chrono::system_clock::time_point& _timeOutput, + uint32_t _nbChunk, + const std::vector& _status) { std::unique_lock lock(m_mutex); - if (_outputBuffer != nullptr) { - RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); - newOutput(_outputBuffer, _nbChunk, _time); - } + // TODO : Manage status ... if (_inputBuffer != nullptr) { RIVER_VERBOSE("data Input size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); - newInput(_inputBuffer, _nbChunk, _time); + newInput(_inputBuffer, _nbChunk, _timeInput); + } + if (_outputBuffer != nullptr) { + RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); + newOutput(_outputBuffer, _nbChunk, _timeOutput); } return 0; } +int32_t river::io::NodeAirTAudio::recordCallback(const void* _inputBuffer, + const std::chrono::system_clock::time_point& _timeInput, + uint32_t _nbChunk, + const std::vector& _status) { + std::unique_lock lock(m_mutex); + // TODO : Manage status ... + RIVER_VERBOSE("data Input size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); + newInput(_inputBuffer, _nbChunk, _timeInput); + return 0; +} + +int32_t river::io::NodeAirTAudio::playbackCallback(void* _outputBuffer, + const std::chrono::system_clock::time_point& _timeOutput, + uint32_t _nbChunk, + const std::vector& _status) { + std::unique_lock lock(m_mutex); + // TODO : Manage status ... + RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); + newOutput(_outputBuffer, _nbChunk, _timeOutput); + return 0; +} + + std::shared_ptr river::io::NodeAirTAudio::create(const std::string& _name, const std::shared_ptr& _config) { return std::shared_ptr(new river::io::NodeAirTAudio(_name, _config)); @@ -78,6 +103,7 @@ river::io::NodeAirTAudio::NodeAirTAudio(const std::string& _name, const std::sha // intanciate specific API ... m_adac.instanciate(typeInterface); + m_adac.setName(_name); // TODO : Check return ... std::string type = m_config->getStringValue("type", "int16"); if (streamName == "") { @@ -186,24 +212,22 @@ river::io::NodeAirTAudio::NodeAirTAudio(const std::string& _name, const std::sha if (m_isInput == true) { err = m_adac.openStream(nullptr, ¶ms, hardwareFormat.getFormat(), hardwareFormat.getFrequency(), &m_rtaudioFrameSize, - std::bind(&river::io::NodeAirTAudio::airtAudioCallback, + std::bind(&river::io::NodeAirTAudio::recordCallback, this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, - std::placeholders::_4, - std::placeholders::_5) + std::placeholders::_5, + std::placeholders::_6) ); } else { err = m_adac.openStream(¶ms, nullptr, hardwareFormat.getFormat(), hardwareFormat.getFrequency(), &m_rtaudioFrameSize, - std::bind(&river::io::NodeAirTAudio::airtAudioCallback, + std::bind(&river::io::NodeAirTAudio::playbackCallback, this, - std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5) + std::placeholders::_5, + std::placeholders::_6) ); } if (err != airtaudio::error_none) { diff --git a/river/io/NodeAirTAudio.h b/river/io/NodeAirTAudio.h index 4aa9a80..66b86c2 100644 --- a/river/io/NodeAirTAudio.h +++ b/river/io/NodeAirTAudio.h @@ -29,11 +29,20 @@ namespace river { airtaudio::DeviceInfo m_info; unsigned int m_rtaudioFrameSize; public: - int32_t airtAudioCallback(void* _outputBuffer, - void * _inputBuffer, - uint32_t _nbChunk, - const std::chrono::system_clock::time_point& _time, - airtaudio::status _status); + int32_t duplexCallback(const void* _inputBuffer, + const std::chrono::system_clock::time_point& _timeInput, + void* _outputBuffer, + const std::chrono::system_clock::time_point& _timeOutput, + uint32_t _nbChunk, + const std::vector& _status); + int32_t recordCallback(const void* _inputBuffer, + const std::chrono::system_clock::time_point& _timeInput, + uint32_t _nbChunk, + const std::vector& _status); + int32_t playbackCallback(void* _outputBuffer, + const std::chrono::system_clock::time_point& _timeOutput, + uint32_t _nbChunk, + const std::vector& _status); protected: virtual void start(); virtual void stop(); diff --git a/test/main.cpp b/test/main.cpp index dba29e2..bbc888c 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #undef __class__ @@ -108,13 +109,15 @@ class testOutWriteCallback { std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, - std::placeholders::_4)); + std::placeholders::_4, + std::placeholders::_5)); } - void onDataNeeded(const std::chrono::system_clock::time_point& _playTime, - const size_t& _nbChunk, - const std::vector& _map, - enum audio::format _type) { - if (_type != audio::format_int16) { + void onDataNeeded(const std::chrono::system_clock::time_point& _time, + size_t _nbChunk, + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { + if (_format != audio::format_int16) { APPL_ERROR("call wrong type ... (need int16_t)"); } std::vector data; @@ -170,21 +173,22 @@ class testOutCallback { _io, "WriteModeCallback"); // set callback mode ... - m_interface->setOutputCallback(1024, - std::bind(&testOutCallback::onDataNeeded, + m_interface->setOutputCallback(std::bind(&testOutCallback::onDataNeeded, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5, + std::placeholders::_6)); } - void onDataNeeded(const std::chrono::system_clock::time_point& _playTime, - const size_t& _nbChunk, - const std::vector& _map, - void* _data, - enum audio::format _type) { - if (_type != audio::format_int16) { + void onDataNeeded(void* _data, + const std::chrono::system_clock::time_point& _time, + size_t _nbChunk, + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { + if (_format != audio::format_int16) { APPL_ERROR("call wrong type ... (need int16_t)"); } int16_t* data = static_cast(_data); @@ -305,21 +309,22 @@ class testInCallback { _input, "WriteModeCallback"); // set callback mode ... - m_interface->setInputCallback(1024, - std::bind(&testInCallback::onDataReceived, + m_interface->setInputCallback(std::bind(&testInCallback::onDataReceived, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5, + std::placeholders::_6)); } - void onDataReceived(const std::chrono::system_clock::time_point& _readTime, + void onDataReceived(const void* _data, + const std::chrono::system_clock::time_point& _time, size_t _nbChunk, - const std::vector& _map, - const void* _data, - enum audio::format _type) { - if (_type != audio::format_int16) { + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { + if (_format != audio::format_int16) { APPL_ERROR("call wrong type ... (need int16_t)"); } const int16_t* data = static_cast(_data); @@ -347,15 +352,6 @@ TEST(TestALL, testInputCallBack) { process.reset(); usleep(500000); } -TEST(TestALL, testInputCallBackMicClean) { - std::shared_ptr manager; - manager = river::Manager::create("testApplication"); - APPL_INFO("test input (callback mode)"); - std::shared_ptr process = std::make_shared(manager, "microphone-clean"); - process->run(); - process.reset(); - usleep(500000); -} @@ -400,23 +396,24 @@ class testOutCallbackType { "speaker", "WriteModeCallbackType"); // set callback mode ... - m_interface->setOutputCallback(1024, - std::bind(&testOutCallbackType::onDataNeeded, + m_interface->setOutputCallback(std::bind(&testOutCallbackType::onDataNeeded, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5, + std::placeholders::_6)); } - void onDataNeeded(const std::chrono::system_clock::time_point& _playTime, - const size_t& _nbChunk, - const std::vector& _map, - void* _data, - enum audio::format _type) { - //APPL_DEBUG("Get data ... " << _type << " map=" << _map << " chunk=" << _nbChunk); + void onDataNeeded(void* _data, + const std::chrono::system_clock::time_point& _time, + size_t _nbChunk, + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { + //APPL_DEBUG("Get data ... " << _format << " map=" << _map << " chunk=" << _nbChunk); double baseCycle = 2.0*M_PI/double(m_freq) * double(m_generateFreq); - if (_type == audio::format_int16) { + if (_format == audio::format_int16) { int16_t* data = static_cast(_data); for (int32_t iii=0; iii<_nbChunk; iii++) { for (int32_t jjj=0; jjj<_map.size(); jjj++) { @@ -427,7 +424,7 @@ class testOutCallbackType { m_phase -= 2*M_PI; } } - } else if (_type == audio::format_int16_on_int32) { + } else if (_format == audio::format_int16_on_int32) { int32_t* data = static_cast(_data); for (int32_t iii=0; iii<_nbChunk; iii++) { for (int32_t jjj=0; jjj<_map.size(); jjj++) { @@ -438,7 +435,7 @@ class testOutCallbackType { m_phase -= 2*M_PI; } } - } else if (_type == audio::format_int32) { + } else if (_format == audio::format_int32) { int32_t* data = static_cast(_data); for (int32_t iii=0; iii<_nbChunk; iii++) { for (int32_t jjj=0; jjj<_map.size(); jjj++) { @@ -449,7 +446,7 @@ class testOutCallbackType { m_phase -= 2*M_PI; } } - } else if (_type == audio::format_float) { + } else if (_format == audio::format_float) { float* data = static_cast(_data); for (int32_t iii=0; iii<_nbChunk; iii++) { for (int32_t jjj=0; jjj<_map.size(); jjj++) { @@ -539,25 +536,23 @@ class testCallbackVolume { "speaker", "WriteModeCallback"); // set callback mode ... - m_interface->setOutputCallback(1024, - std::bind(&testCallbackVolume::onDataNeeded, + m_interface->setOutputCallback(std::bind(&testCallbackVolume::onDataNeeded, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, - std::placeholders::_5)); + std::placeholders::_5, + std::placeholders::_6)); m_interface->addVolumeGroup("MEDIA"); m_interface->addVolumeGroup("FLOW"); } - void onDataNeeded(const std::chrono::system_clock::time_point& _playTime, - const size_t& _nbChunk, - const std::vector& _map, - void* _data, - enum audio::format _type) { - if (_type != audio::format_int16) { - APPL_ERROR("call wrong type ... (need int16_t)"); - } + void onDataNeeded(void* _data, + const std::chrono::system_clock::time_point& _time, + size_t _nbChunk, + enum audio::format _format, + uint32_t _frequency, + const std::vector& _map) { int16_t* data = static_cast(_data); double baseCycle = 2.0*M_PI/(double)48000 * (double)550; for (int32_t iii=0; iii<_nbChunk; iii++) { @@ -610,6 +605,28 @@ class testCallbackVolume { } }; +void threadVolume(void* _userData) { + std::shared_ptr manager; + manager = river::Manager::create("testApplication"); + std::shared_ptr process = std::make_shared(manager); + process->run(); + process.reset(); + usleep(500000); +} + +TEST(TestALL, testInputCallBackMicClean) { + std::shared_ptr manager; + manager = river::Manager::create("testApplication"); + std::thread tmpThread(&threadVolume, nullptr); + usleep(100000); + + APPL_INFO("test input (callback mode)"); + std::shared_ptr process = std::make_shared(manager, "microphone-clean"); + process->run(); + process.reset(); + usleep(500000); +} + TEST(TestALL, testVolume) { std::shared_ptr manager;