[DEV] work on AEC (base)

This commit is contained in:
Edouard DUPIN 2015-02-17 21:08:15 +01:00
parent 5cbdf378f8
commit 4832570448
11 changed files with 721 additions and 121 deletions

View File

@ -14,6 +14,7 @@ def create(target):
'river/debug.cpp',
'river/Manager.cpp',
'river/Interface.cpp',
'river/CircularBuffer.cpp',
'river/io/Node.cpp',
'river/io/NodeAirTAudio.cpp',
'river/io/NodeAEC.cpp',

274
river/CircularBuffer.cpp Normal file
View File

@ -0,0 +1,274 @@
/** @file
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include <river/CircularBuffer.h>
#include <river/debug.h>
river::CircularBuffer::CircularBuffer(const river::CircularBuffer& _obj) :
m_data(),
m_write(nullptr),
m_read(nullptr),
m_timeRead(),
m_capacity(0),
m_sizeChunk(0),
m_size(0) {
RIVER_CRITICAL("error");
};
/**
* @brief copy operator.
*/
river::CircularBuffer& river::CircularBuffer::operator=(const river::CircularBuffer& _obj) {
RIVER_CRITICAL("error");
return *this;
};
river::CircularBuffer::CircularBuffer() :
m_data(),
m_write(nullptr),
m_read(nullptr),
m_timeRead(),
m_capacity(0),
m_sizeChunk(0),
m_size(0) {
// nothing to do ...
}
river::CircularBuffer::~CircularBuffer() {
m_data.clear();
m_read = nullptr;
m_write = nullptr;
}
void river::CircularBuffer::setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency) {
if ( _chunkSize == m_sizeChunk
&& _capacity == m_capacity) {
clear();
return;
}
RIVER_DEBUG("buffer setCapacity(" << _capacity << "," << _chunkSize << ")");
m_data.clear();
m_write = nullptr;
m_read = nullptr;
m_frequency = _frequency;
m_capacity = _capacity;
m_sizeChunk = _chunkSize;
m_size = 0;
if ( _capacity == 0
|| _chunkSize == 0) {
m_capacity = 0;
m_sizeChunk = 0;
return;
}
m_data.resize(m_capacity*m_sizeChunk, 0);
m_read = &m_data[0];
m_write = &m_data[0];
}
void river::CircularBuffer::setCapacity(std::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency) {
uint32_t nbSampleNeeded = _frequency*_capacity.count()/1000;
RIVER_DEBUG("buffer setCapacity(" << _capacity.count() << "ms ," << _chunkSize << ")");
setCapacity(nbSampleNeeded, _chunkSize, _frequency);
}
size_t river::CircularBuffer::getUsedSizeBeforEnd() const {
size_t size;
if (m_read < m_write) {
size = static_cast<uint8_t*>(m_write) - static_cast<uint8_t*>(m_read);
// the size result is in bytes we need to have it in element
size /= m_sizeChunk;
} else if ( m_read == m_write
&& m_size == 0) {
// no element in the buffer
size = 0;
} else {
size = &m_data[0] + (m_capacity*m_sizeChunk) - static_cast<uint8_t*>(m_read);
// the size result is in bytes we need to have it in element
size /= m_sizeChunk;
}
return size;
}
size_t river::CircularBuffer::getFreeSizeBeforEnd() const {
size_t size;
size = &m_data[0]
+ (m_capacity*m_sizeChunk)
- static_cast<uint8_t*>(m_write);
// the size result is in Octet we need to have it in element
size /= m_sizeChunk;
return size;
}
size_t river::CircularBuffer::write(const void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time) {
size_t nbElementDrop = 0;
size_t freeSizeBeforeEnd = getFreeSizeBeforEnd();
size_t freeSize = m_capacity - m_size;
// Write element in all case
// calculate the number of element that are overwritten
if (freeSize < _nbChunk) {
nbElementDrop = _nbChunk - freeSize;
}
// if User Request a write more important than the size of the buffer ==> update the pointer to feet only on the buffer size
if (m_capacity < _nbChunk) {
RIVER_WARNING("CircularBuffer Write too BIG " << _nbChunk << " buffer max size : " << m_capacity << " (keep last Elements)");
// Move data pointer
_data = static_cast<const uint8_t*>(_data) + (_nbChunk - m_capacity) * m_sizeChunk;
// update size
_nbChunk = m_capacity;
}
// if no element in the FIFO ==> first time write or no more data inside ==> start set the file of the read data ...
if (m_size == 0) {
m_timeRead = _time;
}
// TODO : Check time to push continuous data ...
if (freeSizeBeforeEnd >= _nbChunk) {
// all Data will be copy
memcpy(m_write, _data, _nbChunk * m_sizeChunk);
// update Writing pointer
if (freeSizeBeforeEnd == _nbChunk) {
// update to the end of FIFO ==> update to the start
m_write = &m_data[0];
} else {
m_write = static_cast<uint8_t*>(m_write) + _nbChunk * m_sizeChunk;
}
// Update the number of element in the buffer.
m_size += _nbChunk;
} else {
// copy data to the end of buffer
memcpy(m_write, _data, freeSizeBeforeEnd * m_sizeChunk);
// update Writing pointer ==> end of buffer ==> go to the start
m_write = &m_data[0];
// update data pointer
_data = static_cast<const uint8_t*>(_data) + freeSizeBeforeEnd * m_sizeChunk;
m_size += freeSizeBeforeEnd;
// get the number of element we need to write
_nbChunk -= freeSizeBeforeEnd;
// Copy the las data if needed
if (_nbChunk != 0) {
memcpy(m_write, _data, _nbChunk * m_sizeChunk);
// update Writing pointer
m_write = static_cast<uint8_t*>(m_write) + _nbChunk * m_sizeChunk;
m_size += _nbChunk;
}
}
if (nbElementDrop > 0) {
// if drop element we need to update the reading pointer
m_read = m_write;
m_size = m_capacity;
}
// return the number of element Overwrite
return nbElementDrop;
}
size_t river::CircularBuffer::read(void* _data, size_t _nbChunk) {
return read(_data, _nbChunk, m_timeRead);
}
size_t river::CircularBuffer::read(void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time) {
size_t nbElementDrop = 0;
// Critical section (theoriquely protected by Mutex)
size_t usedSizeBeforeEnd = getUsedSizeBeforEnd();
// verify if we have elements in the Buffer
if (0 < m_size) {
// check the time of the read :
std::chrono::nanoseconds deltaTime = m_timeRead - _time;
if (deltaTime.count() == 0) {
// nothing to do ==> just copy data ...
} else if (deltaTime.count() > 0) {
// Add empty sample in the output buffer ...
size_t nbSampleEmpty = m_frequency*deltaTime.count()/100000000;
nbSampleEmpty = std::min(nbSampleEmpty, _nbChunk);
RIVER_WARNING("add Empty sample in the output buffer " << nbSampleEmpty << " / " << _nbChunk);
memset(_data, 0, nbSampleEmpty * m_sizeChunk);
if (nbSampleEmpty == _nbChunk) {
return 0;
}
_nbChunk -= nbSampleEmpty;
} else {
// Remove data from the FIFO
setReadPosition(_time);
}
if (m_size < _nbChunk) {
nbElementDrop = _nbChunk - m_size;
_nbChunk = m_size;
}
m_timeRead += std::chrono::microseconds(_nbChunk*1000000/m_frequency);
if (usedSizeBeforeEnd >= _nbChunk) {
// all Data will be copy
memcpy(_data, m_read, _nbChunk * m_sizeChunk);
// update Writing pointer
m_read = static_cast<uint8_t*>(m_read) + _nbChunk * m_sizeChunk;
m_size -= _nbChunk;
// update output pointer in case of flush with 0 data
_data = static_cast<uint8_t*>(_data) + _nbChunk * m_sizeChunk;
} else {
// copy data to the end of buffer
memcpy(_data, m_read, usedSizeBeforeEnd * m_sizeChunk);
// update Writing pointer ==> end of buffer ==> go to the start
m_read = &m_data[0];
_data = static_cast<uint8_t*>(_data) + usedSizeBeforeEnd * m_sizeChunk;
m_size -= usedSizeBeforeEnd;
// get the number of element we need to write
_nbChunk -= usedSizeBeforeEnd;
// Copy the last data if needed
if (0 != _nbChunk) {
memcpy(_data, m_read, _nbChunk * m_sizeChunk);
// update Writing pointer
m_read = static_cast<uint8_t*>(m_read) + _nbChunk * m_sizeChunk;
m_size -= _nbChunk;
// update output pointer in case of flush with 0 data
_data = static_cast<uint8_t*>(_data) + _nbChunk * m_sizeChunk;
}
}
} else {
nbElementDrop = _nbChunk;
}
if (0 < nbElementDrop) {
// set 0 in last element of the output
memset(_data, 0, m_sizeChunk * nbElementDrop);
}
// return the number of element droped
return nbElementDrop;
}
void river::CircularBuffer::setReadPosition(const std::chrono::system_clock::time_point& _time) {
// Critical section (theoriquely protected by Mutex)
size_t usedSizeBeforeEnd = getUsedSizeBeforEnd();
if (0 < m_size) {
// check the time of the read :
std::chrono::nanoseconds deltaTime = m_timeRead - _time;
size_t nbSampleToRemove = m_frequency*-deltaTime.count()/100000000;
nbSampleToRemove = std::min(nbSampleToRemove, m_size);
RIVER_WARNING("Remove sample in the buffer " << nbSampleToRemove << " / " << m_size);
if (usedSizeBeforeEnd >= nbSampleToRemove) {
usedSizeBeforeEnd -= nbSampleToRemove;
m_size -= nbSampleToRemove;
m_read = static_cast<uint8_t*>(m_read) + nbSampleToRemove * m_sizeChunk;
} else {
nbSampleToRemove -= usedSizeBeforeEnd;
m_size -= nbSampleToRemove;
m_read = &m_data[0] + nbSampleToRemove*m_sizeChunk;
}
m_timeRead += deltaTime;
} else {
m_timeRead = std::chrono::system_clock::time_point();
}
}
size_t river::CircularBuffer::getFreeSize() const {
return m_capacity - m_size;
}
void river::CircularBuffer::clear() {
RIVER_DEBUG("buffer clear()");
// set pointer to the start
m_read = &m_data[0];
m_write = &m_data[0];
// Clean the number of element in the buffer
m_size = 0;
// Clean all element inside :
memset(&m_data[0], 0, m_sizeChunk * m_capacity);
}

139
river/CircularBuffer.h Normal file
View File

@ -0,0 +1,139 @@
/** @file
* @author Edouard DUPIN
* @copyright 2011, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#ifndef __RIVER_CIRCULAR_BUFFER_H__
#define __RIVER_CIRCULAR_BUFFER_H__
#include <etk/types.h>
#include <vector>
#include <chrono>
namespace river {
/**
* For these functions we have 4 solutions :
* - Free Buffer
* ----------------------------------------------------------
* m_data | | |
* ----------------------------------------------------------
* m_write
* m_read
* - Full Buffer
* ----------------------------------------------------------
* m_data |****************************|***************************|
* ----------------------------------------------------------
* m_write
* m_read
* - Buffer in used
* ----------------------------------------------------------
* m_data | |********************| |
* ----------------------------------------------------------
* m_read m_write
* - Buffer out used
* ----------------------------------------------------------
* m_data |****************| |******************|
* ----------------------------------------------------------
* m_read m_write
*/
class CircularBuffer {
private:
std::vector<uint8_t> m_data; //!< data pointer
void* m_write; //!< write pointer
void* m_read; //!< read pointer
std::chrono::system_clock::time_point m_timeRead; //!< current read time
uint32_t m_frequency;
// TODO : Remove the m_size ==> this is a bad element to be mutex-less
size_t m_size; //!< number of chunk availlable in this buffer
size_t m_capacity; //!< number of chunk available in this Buffer
size_t m_sizeChunk; //!< Size of one chunk (in byte)
public:
CircularBuffer();
~CircularBuffer();
/**
* @brief copy contructor.
* @param[in] _obj Circular buffer object
*/
CircularBuffer(const river::CircularBuffer& _obj);
/**
* @brief copy operator.
* @param[in] _obj Circular buffer object
*/
CircularBuffer& operator=(const river::CircularBuffer& _obj);
/**
* @brief set the capacity of the circular buffer.
* @param[in] _capacity Number of chunk in the buffer.
* @param[in] _chunkSize Size of one chunk.
* @param[in] _frequency Frequency of the buffer
*/
void setCapacity(size_t _capacity, size_t _chunkSize, uint32_t _frequency);
/**
* @brief set the capacity of the circular buffer.
* @param[in] _capacity time in millisecond stored in the buffer.
* @param[in] _chunkSize Size of one chunk.
* @param[in] _frequency Frequency of the buffer
*/
void setCapacity(std::chrono::milliseconds _capacity, size_t _chunkSize, uint32_t _frequency);
/**
* @brief get free size of the buffer.
* @return Number of free chunk.
*/
size_t getFreeSize() const;
/**
* @brief Get number of chunk in the buffer.
* @return number of chunk.
*/
size_t getSize() const {
return m_size;
}
/**
* @brief Get number of chunk that can be set in the buffer.
* @return number of chunk.
*/
size_t getCapacity() const {
return m_capacity;
}
/**
* @brief Write chunk in the buffer.
* @param[in] _data Pointer on the data.
* @param[in] _nbChunk number of chunk to copy.
* @param[in] _time Time to start write data (if before end ==> not replace data, write only if after end)
* @return Number of chunk copied.
*/
size_t write(const void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time);
/**
* @brief Read Chunk from the buffer to the pointer data.
* @param[out] _data Pointer on the data.
* @param[in] _nbChunk number of chunk to copy.
* @param[in] _time Time to start read data (if before start ==> add 0 at start, if after, remove unread data)
* @return Number of chunk copied.
*/
size_t read(void* _data, size_t _nbChunk, const std::chrono::system_clock::time_point& _time);
//! @previous
size_t read(void* _data, size_t _nbChunk);
void setReadPosition(const std::chrono::system_clock::time_point& _time);
std::chrono::system_clock::time_point getReadTimeStamp() {
return m_timeRead;
}
/**
* @brief Clear the buffer.
*/
void clear();
private:
/**
* @brief Get number of free chunks before end of buffer.
* @return Number of chunk.
*/
size_t getFreeSizeBeforEnd() const;
/**
* @brief Get number of used chunks before end of buffer.
* @return Number of chunk.
*/
size_t getUsedSizeBeforEnd() const;
};
}
#endif

View File

@ -45,7 +45,8 @@ bool river::Interface::init(const std::string& _name,
// register interface to be notify from the volume change.
m_node->registerAsRemote(shared_from_this());
// Create convertion interface
if (m_node->isInput() == true) {
if ( m_node->isInput() == true
&& m_mode == river::modeInterface_input) {
m_process.setInputConfig(m_node->getInterfaceFormat());
// add all time the volume stage :
std::shared_ptr<drain::Volume> algo = drain::Volume::create();
@ -59,7 +60,8 @@ bool river::Interface::init(const std::string& _name,
algo->addVolumeStage(tmpVolume);
}
m_process.setOutputConfig(drain::IOFormatInterface(_map, _format, _freq));
} else {
} else if ( m_node->isOutput() == true
&& m_mode == river::modeInterface_output) {
m_process.setInputConfig(drain::IOFormatInterface(_map, _format, _freq));
// add all time the volume stage :
std::shared_ptr<drain::Volume> algo = drain::Volume::create();
@ -73,6 +75,19 @@ bool river::Interface::init(const std::string& _name,
algo->addVolumeStage(tmpVolume);
}
m_process.setOutputConfig(m_node->getInterfaceFormat());
} else if ( m_node->isOutput() == true
&& m_mode == river::modeInterface_feedback) {
m_process.setInputConfig(m_node->getHarwareFormat());
// add all time the volume stage :
std::shared_ptr<drain::Volume> algo = drain::Volume::create();
//algo->setInputFormat(m_node->getInterfaceFormat());
algo->setName("volume");
m_process.pushBack(algo);
// note : feedback has no volume stage ...
m_process.setOutputConfig(drain::IOFormatInterface(_map, _format, _freq));
} else {
RIVER_ERROR("Can not link virtual interface with type : " << m_mode << " to a hardware interface " << (m_node->isInput()==true?"input":"output"));
return false;
}
return true;
}
@ -116,7 +131,7 @@ void river::Interface::setReadwrite() {
}
}
void river::Interface::setOutputCallback(size_t _chunkSize, drain::needDataFunction _function) {
void river::Interface::setOutputCallback(drain::playbackFunction _function) {
std::unique_lock<std::recursive_mutex> lock(m_mutex);
m_process.removeAlgoDynamic();
m_process.removeIfFirst<drain::EndPoint>();
@ -124,7 +139,7 @@ void river::Interface::setOutputCallback(size_t _chunkSize, drain::needDataFunct
m_process.pushFront(algo);
}
void river::Interface::setInputCallback(size_t _chunkSize, drain::haveNewDataFunction _function) {
void river::Interface::setInputCallback(drain::recordFunction _function) {
std::unique_lock<std::recursive_mutex> lock(m_mutex);
m_process.removeAlgoDynamic();
m_process.removeIfLast<drain::EndPoint>();
@ -132,7 +147,7 @@ void river::Interface::setInputCallback(size_t _chunkSize, drain::haveNewDataFun
m_process.pushBack(algo);
}
void river::Interface::setWriteCallback(drain::needDataFunctionWrite _function) {
void river::Interface::setWriteCallback(drain::playbackFunctionWrite _function) {
std::unique_lock<std::recursive_mutex> lock(m_mutex);
m_process.removeAlgoDynamic();
std::shared_ptr<drain::EndPointWrite> algo = m_process.get<drain::EndPointWrite>(0);

View File

@ -96,9 +96,9 @@ namespace river {
/**
* @brief When we want to implement a Callback Mode:
*/
virtual void setWriteCallback(drain::needDataFunctionWrite _function);
virtual void setOutputCallback(size_t _chunkSize, drain::needDataFunction _function);
virtual void setInputCallback(size_t _chunkSize, drain::haveNewDataFunction _function);
virtual void setWriteCallback(drain::playbackFunctionWrite _function);
virtual void setOutputCallback(drain::playbackFunction _function);
virtual void setInputCallback(drain::recordFunction _function);
/**
* @brief Add a volume group of the current channel.
* @note If you do not call this function with the group "FLOW" you chan not have a channel volume.

View File

@ -50,7 +50,6 @@ namespace river {
return m_process.getInputConfig();
}
}
protected:
const drain::IOFormatInterface& getHarwareFormat() {
if (m_isInput == true) {
return m_process.getInputConfig();

View File

@ -149,6 +149,7 @@ river::io::NodeAEC::NodeAEC(const std::string& _name, const std::shared_ptr<cons
*/
std::vector<audio::channel> feedbackMap;
feedbackMap.push_back(audio::channel_frontCenter);
RIVER_INFO("Create FEEDBACK : ");
m_interfaceFeedBack = createInput(hardwareFormat.getFrequency(),
feedbackMap,
hardwareFormat.getFormat(),
@ -158,6 +159,7 @@ river::io::NodeAEC::NodeAEC(const std::string& _name, const std::shared_ptr<cons
RIVER_ERROR("Can not opne virtual device ... map-on-feedback in " << _name);
return;
}
RIVER_INFO("Create MICROPHONE : ");
m_interfaceMicrophone = createInput(hardwareFormat.getFrequency(),
hardwareFormat.getMap(),
hardwareFormat.getFormat(),
@ -169,23 +171,30 @@ river::io::NodeAEC::NodeAEC(const std::string& _name, const std::shared_ptr<cons
}
// set callback mode ...
m_interfaceFeedBack->setInputCallback(1024,
std::bind(&river::io::NodeAEC::onDataReceivedFeedBack,
m_interfaceFeedBack->setInputCallback(std::bind(&river::io::NodeAEC::onDataReceivedFeedBack,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5,
std::placeholders::_6));
// set callback mode ...
m_interfaceMicrophone->setInputCallback(1024,
std::bind(&river::io::NodeAEC::onDataReceivedMicrophone,
m_interfaceMicrophone->setInputCallback(std::bind(&river::io::NodeAEC::onDataReceivedMicrophone,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5,
std::placeholders::_6));
m_bufferMicrophone.setCapacity(std::chrono::milliseconds(1000),
audio::getFormatBytes(hardwareFormat.getFormat())*hardwareFormat.getMap().size(),
hardwareFormat.getFrequency());
m_bufferFeedBack.setCapacity(std::chrono::milliseconds(1000),
audio::getFormatBytes(hardwareFormat.getFormat()), // only one channel ...
hardwareFormat.getFrequency());
m_process.updateInterAlgo();
}
@ -201,9 +210,11 @@ void river::io::NodeAEC::start() {
std::unique_lock<std::mutex> lock(m_mutex);
RIVER_INFO("Start stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") );
if (m_interfaceFeedBack != nullptr) {
RIVER_INFO("Start FEEDBACK : ");
m_interfaceFeedBack->start();
}
if (m_interfaceMicrophone != nullptr) {
RIVER_INFO("Start Microphone : ");
m_interfaceMicrophone->start();
}
}
@ -220,35 +231,140 @@ void river::io::NodeAEC::stop() {
namespace std {
static std::ostream& operator <<(std::ostream& _os, const std::chrono::system_clock::time_point& _obj) {
std::chrono::microseconds us = std::chrono::duration_cast<std::chrono::microseconds>(_obj.time_since_epoch());
_os << us.count();
std::chrono::nanoseconds ns = std::chrono::duration_cast<std::chrono::nanoseconds>(_obj.time_since_epoch());
int64_t totalSecond = ns.count()/1000000000;
int64_t millisecond = (ns.count()%1000000000)/1000000;
int64_t microsecond = (ns.count()%1000000)/1000;
int64_t nanosecond = ns.count()%1000;
//_os << totalSecond << "s " << millisecond << "ms " << microsecond << "µs " << nanosecond << "ns";
int32_t second = totalSecond % 60;
int32_t minute = (totalSecond/60)%60;
int32_t hour = (totalSecond/3600)%24;
int32_t day = (totalSecond/(24*3600))%365;
int32_t year = totalSecond/(24*3600*365);
_os << year << "y " << day << "d " << hour << "h" << minute << ":"<< second << "s " << millisecond << "ms " << microsecond << "µs " << nanosecond << "ns";
return _os;
}
}
#define SAVE_FILE_MACRO(type,fileName,dataPointer,nbElement) \
do { \
static FILE *pointerOnFile = nullptr; \
static bool errorOpen = false; \
if (NULL==pointerOnFile) { \
RIVER_WARNING("open file '" << fileName << "' type=" << #type); \
pointerOnFile = fopen(fileName,"w"); \
if ( errorOpen == false \
&& pointerOnFile == nullptr) { \
RIVER_ERROR("ERROR OPEN file ... '" << fileName << "' type=" << #type); \
errorOpen=true; \
} \
} \
if (pointerOnFile != nullptr) { \
fwrite((dataPointer), sizeof(type), (nbElement), pointerOnFile); \
fflush(pointerOnFile); \
} \
}while(0)
void river::io::NodeAEC::onDataReceivedMicrophone(const std::chrono::system_clock::time_point& _time,
void river::io::NodeAEC::onDataReceivedMicrophone(const void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
const std::vector<audio::channel>& _map,
const void* _data,
enum audio::format _type) {
RIVER_INFO("Microphone Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _type=" << _type);
if (_type != audio::format_int16) {
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
RIVER_DEBUG("Microphone Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _format=" << _format << " freq=" << _frequency);
if (_format != audio::format_int16) {
RIVER_ERROR("call wrong type ... (need int16_t)");
}
// push data synchronize
// if threaded : send event / otherwise, process ...
newInput(_data, _nbChunk, _time);
std::unique_lock<std::mutex> lock(m_mutex);
m_bufferMicrophone.write(_data, _nbChunk, _time);
SAVE_FILE_MACRO(int16_t, "REC_Microphone.raw", _data, _nbChunk*_map.size());
process();
}
void river::io::NodeAEC::onDataReceivedFeedBack(const std::chrono::system_clock::time_point& _time,
void river::io::NodeAEC::onDataReceivedFeedBack(const void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
const std::vector<audio::channel>& _map,
const void* _data,
enum audio::format _type) {
RIVER_INFO("FeedBack Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _type=" << _type);
if (_type != audio::format_int16) {
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
RIVER_DEBUG("FeedBack Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _format=" << _format << " freq=" << _frequency);
if (_format != audio::format_int16) {
RIVER_ERROR("call wrong type ... (need int16_t)");
}
// TODO : Call synchro ...
// push data synchronize
std::unique_lock<std::mutex> lock(m_mutex);
m_bufferFeedBack.write(_data, _nbChunk, _time);
SAVE_FILE_MACRO(int16_t, "REC_FeedBack.raw", _data, _nbChunk*_map.size());
process();
}
void river::io::NodeAEC::process() {
if (m_bufferMicrophone.getSize() <= 256) {
return;
}
if (m_bufferFeedBack.getSize() <= 256) {
return;
}
std::chrono::system_clock::time_point MicTime = m_bufferMicrophone.getReadTimeStamp();
std::chrono::system_clock::time_point fbTime = m_bufferFeedBack.getReadTimeStamp();
// Synchronize if possible
if (MicTime < fbTime) {
RIVER_INFO("micTime < fbTime : Change Microphone time start " << fbTime);
RIVER_INFO(" old time stamp=" << m_bufferMicrophone.getReadTimeStamp());
m_bufferMicrophone.setReadPosition(fbTime);
RIVER_INFO(" new time stamp=" << m_bufferMicrophone.getReadTimeStamp());
}
/*
if (MicTime > fbTime) {
RIVER_INFO("micTime > fbTime : Change FeedBack time start " << fbTime);
RIVER_INFO(" old time stamp=" << m_bufferFeedBack.getReadTimeStamp());
m_bufferFeedBack.setReadPosition(MicTime);
RIVER_INFO(" new time stamp=" << m_bufferFeedBack.getReadTimeStamp());
}*/
// check if enought time after synchronisation ...
if (m_bufferMicrophone.getSize() <= 256) {
return;
}
if (m_bufferFeedBack.getSize() <= 256) {
return;
}
MicTime = m_bufferMicrophone.getReadTimeStamp();
fbTime = m_bufferFeedBack.getReadTimeStamp();
if (MicTime != fbTime) {
RIVER_ERROR("Can not synchronize flow ... : " << MicTime << " != " << fbTime << " delta = " << (MicTime-fbTime).count()/1000 << " µs");
return;
}
std::vector<uint8_t> dataMic;
std::vector<uint8_t> dataFB;
dataMic.resize(256*sizeof(int16_t)*2, 0);
dataFB.resize(256*sizeof(int16_t), 0);
while (true) {
MicTime = m_bufferMicrophone.getReadTimeStamp();
fbTime = m_bufferFeedBack.getReadTimeStamp();
RIVER_INFO(" process 256 samples ... " << MicTime);
m_bufferMicrophone.read(&dataMic[0], 256);
m_bufferFeedBack.read(&dataFB[0], 256);
SAVE_FILE_MACRO(int16_t, "REC_Microphone_sync.raw", &dataMic[0], 256*2);
SAVE_FILE_MACRO(int16_t, "REC_FeedBack_sync.raw", &dataFB[0], 256);
// if threaded : send event / otherwise, process ...
//processAEC(&dataMic[0], &dataFB[0], 256, _time);
if (m_bufferMicrophone.getSize() <= 256) {
return;
}
if (m_bufferFeedBack.getSize() <= 256) {
return;
}
}
}
void river::io::NodeAEC::processAEC(void* _dataMic, void* _dataFB, uint32_t _nbChunk, const std::chrono::system_clock::time_point& _time) {
newInput(_dataMic, _nbChunk, _time);
}

View File

@ -9,6 +9,7 @@
#include <river/io/Node.h>
#include <river/Interface.h>
#include <river/CircularBuffer.h>
namespace river {
namespace io {
@ -35,17 +36,22 @@ namespace river {
audio::format _format,
const std::string& _streamName,
const std::string& _name);
void onDataReceivedMicrophone(const std::chrono::system_clock::time_point& _playTime,
void onDataReceivedMicrophone(const void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
const std::vector<audio::channel>& _map,
const void* _data,
enum audio::format _type);
void onDataReceivedFeedBack(const std::chrono::system_clock::time_point& _readTime,
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map);
void onDataReceivedFeedBack(const void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
const std::vector<audio::channel>& _map,
const void* _data,
enum audio::format _type);
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map);
river::CircularBuffer m_bufferMicrophone;
river::CircularBuffer m_bufferFeedBack;
void process();
void processAEC(void* _dataMic, void* _dataFB, uint32_t _nbChunk, const std::chrono::system_clock::time_point& _time);
};
}
}

View File

@ -31,23 +31,48 @@ namespace std {
}
int32_t river::io::NodeAirTAudio::airtAudioCallback(void* _outputBuffer,
void* _inputBuffer,
int32_t river::io::NodeAirTAudio::duplexCallback(const void* _inputBuffer,
const std::chrono::system_clock::time_point& _timeInput,
void* _outputBuffer,
const std::chrono::system_clock::time_point& _timeOutput,
uint32_t _nbChunk,
const std::chrono::system_clock::time_point& _time,
airtaudio::status _status) {
const std::vector<airtaudio::status>& _status) {
std::unique_lock<std::mutex> lock(m_mutex);
if (_outputBuffer != nullptr) {
RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size());
newOutput(_outputBuffer, _nbChunk, _time);
}
// TODO : Manage status ...
if (_inputBuffer != nullptr) {
RIVER_VERBOSE("data Input size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size());
newInput(_inputBuffer, _nbChunk, _time);
newInput(_inputBuffer, _nbChunk, _timeInput);
}
if (_outputBuffer != nullptr) {
RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size());
newOutput(_outputBuffer, _nbChunk, _timeOutput);
}
return 0;
}
int32_t river::io::NodeAirTAudio::recordCallback(const void* _inputBuffer,
const std::chrono::system_clock::time_point& _timeInput,
uint32_t _nbChunk,
const std::vector<airtaudio::status>& _status) {
std::unique_lock<std::mutex> lock(m_mutex);
// TODO : Manage status ...
RIVER_VERBOSE("data Input size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size());
newInput(_inputBuffer, _nbChunk, _timeInput);
return 0;
}
int32_t river::io::NodeAirTAudio::playbackCallback(void* _outputBuffer,
const std::chrono::system_clock::time_point& _timeOutput,
uint32_t _nbChunk,
const std::vector<airtaudio::status>& _status) {
std::unique_lock<std::mutex> lock(m_mutex);
// TODO : Manage status ...
RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size());
newOutput(_outputBuffer, _nbChunk, _timeOutput);
return 0;
}
std::shared_ptr<river::io::NodeAirTAudio> river::io::NodeAirTAudio::create(const std::string& _name, const std::shared_ptr<const ejson::Object>& _config) {
return std::shared_ptr<river::io::NodeAirTAudio>(new river::io::NodeAirTAudio(_name, _config));
@ -78,6 +103,7 @@ river::io::NodeAirTAudio::NodeAirTAudio(const std::string& _name, const std::sha
// intanciate specific API ...
m_adac.instanciate(typeInterface);
m_adac.setName(_name);
// TODO : Check return ...
std::string type = m_config->getStringValue("type", "int16");
if (streamName == "") {
@ -186,24 +212,22 @@ river::io::NodeAirTAudio::NodeAirTAudio(const std::string& _name, const std::sha
if (m_isInput == true) {
err = m_adac.openStream(nullptr, &params,
hardwareFormat.getFormat(), hardwareFormat.getFrequency(), &m_rtaudioFrameSize,
std::bind(&river::io::NodeAirTAudio::airtAudioCallback,
std::bind(&river::io::NodeAirTAudio::recordCallback,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5)
std::placeholders::_5,
std::placeholders::_6)
);
} else {
err = m_adac.openStream(&params, nullptr,
hardwareFormat.getFormat(), hardwareFormat.getFrequency(), &m_rtaudioFrameSize,
std::bind(&river::io::NodeAirTAudio::airtAudioCallback,
std::bind(&river::io::NodeAirTAudio::playbackCallback,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5)
std::placeholders::_5,
std::placeholders::_6)
);
}
if (err != airtaudio::error_none) {

View File

@ -29,11 +29,20 @@ namespace river {
airtaudio::DeviceInfo m_info;
unsigned int m_rtaudioFrameSize;
public:
int32_t airtAudioCallback(void* _outputBuffer,
void * _inputBuffer,
int32_t duplexCallback(const void* _inputBuffer,
const std::chrono::system_clock::time_point& _timeInput,
void* _outputBuffer,
const std::chrono::system_clock::time_point& _timeOutput,
uint32_t _nbChunk,
const std::chrono::system_clock::time_point& _time,
airtaudio::status _status);
const std::vector<airtaudio::status>& _status);
int32_t recordCallback(const void* _inputBuffer,
const std::chrono::system_clock::time_point& _timeInput,
uint32_t _nbChunk,
const std::vector<airtaudio::status>& _status);
int32_t playbackCallback(void* _outputBuffer,
const std::chrono::system_clock::time_point& _timeOutput,
uint32_t _nbChunk,
const std::vector<airtaudio::status>& _status);
protected:
virtual void start();
virtual void stop();

View File

@ -12,6 +12,7 @@
#include <math.h>
#include <sstream>
#include <thread>
#include <unistd.h>
#undef __class__
@ -108,13 +109,15 @@ class testOutWriteCallback {
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4));
std::placeholders::_4,
std::placeholders::_5));
}
void onDataNeeded(const std::chrono::system_clock::time_point& _playTime,
const size_t& _nbChunk,
const std::vector<audio::channel>& _map,
enum audio::format _type) {
if (_type != audio::format_int16) {
void onDataNeeded(const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
if (_format != audio::format_int16) {
APPL_ERROR("call wrong type ... (need int16_t)");
}
std::vector<int16_t> data;
@ -170,21 +173,22 @@ class testOutCallback {
_io,
"WriteModeCallback");
// set callback mode ...
m_interface->setOutputCallback(1024,
std::bind(&testOutCallback::onDataNeeded,
m_interface->setOutputCallback(std::bind(&testOutCallback::onDataNeeded,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5,
std::placeholders::_6));
}
void onDataNeeded(const std::chrono::system_clock::time_point& _playTime,
const size_t& _nbChunk,
const std::vector<audio::channel>& _map,
void* _data,
enum audio::format _type) {
if (_type != audio::format_int16) {
void onDataNeeded(void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
if (_format != audio::format_int16) {
APPL_ERROR("call wrong type ... (need int16_t)");
}
int16_t* data = static_cast<int16_t*>(_data);
@ -305,21 +309,22 @@ class testInCallback {
_input,
"WriteModeCallback");
// set callback mode ...
m_interface->setInputCallback(1024,
std::bind(&testInCallback::onDataReceived,
m_interface->setInputCallback(std::bind(&testInCallback::onDataReceived,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5,
std::placeholders::_6));
}
void onDataReceived(const std::chrono::system_clock::time_point& _readTime,
void onDataReceived(const void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
const std::vector<audio::channel>& _map,
const void* _data,
enum audio::format _type) {
if (_type != audio::format_int16) {
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
if (_format != audio::format_int16) {
APPL_ERROR("call wrong type ... (need int16_t)");
}
const int16_t* data = static_cast<const int16_t*>(_data);
@ -347,15 +352,6 @@ TEST(TestALL, testInputCallBack) {
process.reset();
usleep(500000);
}
TEST(TestALL, testInputCallBackMicClean) {
std::shared_ptr<river::Manager> manager;
manager = river::Manager::create("testApplication");
APPL_INFO("test input (callback mode)");
std::shared_ptr<testInCallback> process = std::make_shared<testInCallback>(manager, "microphone-clean");
process->run();
process.reset();
usleep(500000);
}
@ -400,23 +396,24 @@ class testOutCallbackType {
"speaker",
"WriteModeCallbackType");
// set callback mode ...
m_interface->setOutputCallback(1024,
std::bind(&testOutCallbackType::onDataNeeded,
m_interface->setOutputCallback(std::bind(&testOutCallbackType::onDataNeeded,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5,
std::placeholders::_6));
}
void onDataNeeded(const std::chrono::system_clock::time_point& _playTime,
const size_t& _nbChunk,
const std::vector<audio::channel>& _map,
void* _data,
enum audio::format _type) {
//APPL_DEBUG("Get data ... " << _type << " map=" << _map << " chunk=" << _nbChunk);
void onDataNeeded(void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
//APPL_DEBUG("Get data ... " << _format << " map=" << _map << " chunk=" << _nbChunk);
double baseCycle = 2.0*M_PI/double(m_freq) * double(m_generateFreq);
if (_type == audio::format_int16) {
if (_format == audio::format_int16) {
int16_t* data = static_cast<int16_t*>(_data);
for (int32_t iii=0; iii<_nbChunk; iii++) {
for (int32_t jjj=0; jjj<_map.size(); jjj++) {
@ -427,7 +424,7 @@ class testOutCallbackType {
m_phase -= 2*M_PI;
}
}
} else if (_type == audio::format_int16_on_int32) {
} else if (_format == audio::format_int16_on_int32) {
int32_t* data = static_cast<int32_t*>(_data);
for (int32_t iii=0; iii<_nbChunk; iii++) {
for (int32_t jjj=0; jjj<_map.size(); jjj++) {
@ -438,7 +435,7 @@ class testOutCallbackType {
m_phase -= 2*M_PI;
}
}
} else if (_type == audio::format_int32) {
} else if (_format == audio::format_int32) {
int32_t* data = static_cast<int32_t*>(_data);
for (int32_t iii=0; iii<_nbChunk; iii++) {
for (int32_t jjj=0; jjj<_map.size(); jjj++) {
@ -449,7 +446,7 @@ class testOutCallbackType {
m_phase -= 2*M_PI;
}
}
} else if (_type == audio::format_float) {
} else if (_format == audio::format_float) {
float* data = static_cast<float*>(_data);
for (int32_t iii=0; iii<_nbChunk; iii++) {
for (int32_t jjj=0; jjj<_map.size(); jjj++) {
@ -539,25 +536,23 @@ class testCallbackVolume {
"speaker",
"WriteModeCallback");
// set callback mode ...
m_interface->setOutputCallback(1024,
std::bind(&testCallbackVolume::onDataNeeded,
m_interface->setOutputCallback(std::bind(&testCallbackVolume::onDataNeeded,
this,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
std::placeholders::_4,
std::placeholders::_5));
std::placeholders::_5,
std::placeholders::_6));
m_interface->addVolumeGroup("MEDIA");
m_interface->addVolumeGroup("FLOW");
}
void onDataNeeded(const std::chrono::system_clock::time_point& _playTime,
const size_t& _nbChunk,
const std::vector<audio::channel>& _map,
void* _data,
enum audio::format _type) {
if (_type != audio::format_int16) {
APPL_ERROR("call wrong type ... (need int16_t)");
}
void onDataNeeded(void* _data,
const std::chrono::system_clock::time_point& _time,
size_t _nbChunk,
enum audio::format _format,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
int16_t* data = static_cast<int16_t*>(_data);
double baseCycle = 2.0*M_PI/(double)48000 * (double)550;
for (int32_t iii=0; iii<_nbChunk; iii++) {
@ -610,6 +605,28 @@ class testCallbackVolume {
}
};
void threadVolume(void* _userData) {
std::shared_ptr<river::Manager> manager;
manager = river::Manager::create("testApplication");
std::shared_ptr<testCallbackVolume> process = std::make_shared<testCallbackVolume>(manager);
process->run();
process.reset();
usleep(500000);
}
TEST(TestALL, testInputCallBackMicClean) {
std::shared_ptr<river::Manager> manager;
manager = river::Manager::create("testApplication");
std::thread tmpThread(&threadVolume, nullptr);
usleep(100000);
APPL_INFO("test input (callback mode)");
std::shared_ptr<testInCallback> process = std::make_shared<testInCallback>(manager, "microphone-clean");
process->run();
process.reset();
usleep(500000);
}
TEST(TestALL, testVolume) {
std::shared_ptr<river::Manager> manager;