[DEV] rework hw synchronisme to group interface and correct parsing of timestamp mode

This commit is contained in:
Edouard DUPIN 2015-03-03 21:28:07 +01:00
parent 76bc3c9322
commit d431f9d790
12 changed files with 310 additions and 64 deletions

View File

@ -10,10 +10,10 @@
# name of the interface
# name:"default",
name:"hw:0,0",
timestamp-mode:"hardware",
timestamp-mode:"trigered",
},
# lik hardware ti the specified IO name :
hw-link:"speaker",
# Link 2 ios with the same time in low level (named group) :
group:"baseIOSynchrone",
# frequency to open device
frequency:48000,
# mapping of the harware device (mapping is not get under)
@ -32,8 +32,9 @@
interface:"alsa",
#name:"default",
name:"hw:0,0",
timestamp-mode:"hardware",
timestamp-mode:"trigered",
},
group:"baseIOSynchrone",
frequency:48000,
channel-map:[
"front-left", "front-right",

View File

@ -15,6 +15,7 @@ def create(target):
'river/Manager.cpp',
'river/Interface.cpp',
'river/CircularBuffer.cpp',
'river/io/Group.cpp',
'river/io/Node.cpp',
'river/io/NodeAirTAudio.cpp',
'river/io/NodePortAudio.cpp',

View File

@ -234,16 +234,18 @@ size_t river::CircularBuffer::read(void* _data, size_t _nbChunk, const std11::ch
// return the number of element droped
return nbElementDrop;
}
void river::CircularBuffer::setReadPosition(const std11::chrono::system_clock::time_point& _time) {
// Critical section (theoriquely protected by Mutex)
size_t usedSizeBeforeEnd = getUsedSizeBeforEnd();
if (0 < m_size) {
// check the time of the read :
std11::chrono::nanoseconds deltaTime = _time - m_timeRead;
size_t nbSampleToRemove = m_frequency*deltaTime.count()/1000000000;
size_t nbSampleToRemove = int64_t(m_frequency)*int64_t(deltaTime.count())/1000000000LL;
nbSampleToRemove = std::min(nbSampleToRemove, m_size);
RIVER_WARNING("Remove sample in the buffer " << nbSampleToRemove << " / " << m_size);
std11::chrono::nanoseconds updateTime((nbSampleToRemove*1000000000)/int64_t(m_frequency));
RIVER_VERBOSE("Remove sample in the buffer " << nbSampleToRemove << " / " << m_size);
std11::chrono::nanoseconds updateTime((int64_t(nbSampleToRemove)*1000000000LL)/int64_t(m_frequency));
RIVER_VERBOSE(" add time : " << updateTime.count() << "ns / " << deltaTime.count() << "ns");
if (usedSizeBeforeEnd >= nbSampleToRemove) {
usedSizeBeforeEnd -= nbSampleToRemove;
m_size -= nbSampleToRemove;
@ -260,6 +262,7 @@ void river::CircularBuffer::setReadPosition(const std11::chrono::system_clock::t
}
}
size_t river::CircularBuffer::getFreeSize() const {
return m_capacity - m_size;
}

135
river/io/Group.cpp Normal file
View File

@ -0,0 +1,135 @@
/** @file
* @author Edouard DUPIN
* @copyright 2015, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#include <river/io/Group.h>
#include <river/debug.h>
#include "Node.h"
#include "NodeAEC.h"
#include "NodeAirTAudio.h"
#include "NodePortAudio.h"
#include "Node.h"
#undef __class__
#define __class__ "io::Group"
void river::io::Group::createFrom(const ejson::Document& _obj, const std::string& _name) {
RIVER_INFO("Create Group[" << _name << "] (START) ___________________________");
for (size_t iii=0; iii<_obj.size(); ++iii) {
const std11::shared_ptr<const ejson::Object> tmpObject = _obj.getObject(_obj.getKey(iii));
if (tmpObject == nullptr) {
continue;
}
std::string groupName = tmpObject->getStringValue("group", "");
if (groupName == _name) {
RIVER_INFO("Add element in Group[" << _name << "]: " << _obj.getKey(iii));
// get type : io
std::string ioType = tmpObject->getStringValue("io", "error");
#ifdef __AIRTAUDIO_INFERFACE__
if ( ioType == "input"
|| ioType == "output") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodeAirTAudio::create(_obj.getKey(iii), tmpObject);
tmp->setGroup(shared_from_this());
m_list.push_back(tmp);
}
#endif
#ifdef __PORTAUDIO_INFERFACE__
if ( ioType == "PAinput"
|| ioType == "PAoutput") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodePortAudio::create(_obj.getKey(iii), tmpObject);
tmp->setGroup(shared_from_this());
m_list.push_back(tmp);
}
#endif
}
}
// Link all the IO together : (not needed if one device ...
// Note : The interlink work only for alsa (NOW) and with AirTAudio...
if(m_list.size() > 1) {
#ifdef __AIRTAUDIO_INFERFACE__
std11::shared_ptr<river::io::NodeAirTAudio> linkRef = std11::dynamic_pointer_cast<river::io::NodeAirTAudio>(m_list[0]);
for (size_t iii=1; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
std11::shared_ptr<river::io::NodeAirTAudio> link = std11::dynamic_pointer_cast<river::io::NodeAirTAudio>(m_list[iii]);
linkRef->m_adac.isMasterOf(link->m_adac);
}
}
#endif
}
/*
// manage Link Between Nodes :
if (m_link != nullptr) {
RIVER_INFO("******** START LINK ************");
std11::shared_ptr<river::io::NodeAirTAudio> link = std11::dynamic_pointer_cast<river::io::NodeAirTAudio>(m_link);
if (link == nullptr) {
RIVER_ERROR("Can not link 2 Interface with not the same type (reserved for HW interface)");
return;
}
link->m_adac.isMasterOf(m_adac);
// TODO : Add return ...
RIVER_INFO("******** LINK might be done ************");
}
*/
RIVER_INFO("Create Group[" << _name << "] ( END ) ___________________________");
RIVER_INFO("Group[" << _name << "] List elements : ");
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
RIVER_INFO(" " << m_list[iii]->getName());
}
}
}
std11::shared_ptr<river::io::Node> river::io::Group::getNode(const std::string& _name) {
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
if (m_list[iii]->getName() == _name) {
return m_list[iii];
}
}
}
return std11::shared_ptr<river::io::Node>();
}
void river::io::Group::start() {
RIVER_ERROR("request start ");
int32_t count = 0;
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
count += m_list[iii]->getNumberOfInterface();
}
}
RIVER_ERROR(" have " << count << " interfaces ...");
if (count == 1) {
RIVER_ERROR("GROUP :::::::::::: START() [START]");
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
m_list[iii]->start();
}
}
RIVER_ERROR("GROUP :::::::::::: START() [DONE]");
}
}
void river::io::Group::stop() {
RIVER_ERROR("request stop ");
int32_t count = 0;
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
count += m_list[iii]->getNumberOfInterface();
}
}
RIVER_ERROR(" have " << count << " interfaces ...");
if (count == 0) {
RIVER_ERROR("GROUP :::::::::::: STOP() [START]");
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] != nullptr) {
m_list[iii]->stop();
}
}
RIVER_ERROR("GROUP :::::::::::: STOP() [DONE]");
}
}

34
river/io/Group.h Normal file
View File

@ -0,0 +1,34 @@
/** @file
* @author Edouard DUPIN
* @copyright 2015, Edouard DUPIN, all right reserved
* @license APACHE v2.0 (see license file)
*/
#ifndef __RIVER_IO_GROUP_H__
#define __RIVER_IO_GROUP_H__
#include <string>
#include <vector>
#include <ejson/ejson.h>
namespace river {
namespace io {
class Node;
class Manager;
class Group : public std11::enable_shared_from_this<Group> {
public:
Group() {}
~Group() {}
private:
std::vector< std11::shared_ptr<Node> > m_list;
public:
void createFrom(const ejson::Document& _obj, const std::string& _name);
std11::shared_ptr<river::io::Node> getNode(const std::string& _name);
void start();
void stop();
};
}
}
#endif

View File

@ -11,6 +11,9 @@
#include "NodeAirTAudio.h"
#include "NodePortAudio.h"
#include <etk/os/FSNode.h>
#include <etk/memory.h>
#include <etk/types.h>
#include <utility>
#undef __class__
#define __class__ "io::Manager"
@ -84,40 +87,74 @@ std11::shared_ptr<river::io::Manager> river::io::Manager::getInstance() {
std11::shared_ptr<river::io::Node> river::io::Manager::getNode(const std::string& _name) {
RIVER_WARNING("Get node : " << _name);
// search in the standalone list :
for (size_t iii=0; iii<m_list.size(); ++iii) {
std11::shared_ptr<river::io::Node> tmppp = m_list[iii].lock();
if ( tmppp != nullptr
&& _name == tmppp->getName()) {
RIVER_WARNING(" find it ... ");
RIVER_WARNING(" find it ... in standalone");
return tmppp;
}
}
// search in the group list:
{
for (std::map<std::string, std11::shared_ptr<river::io::Group> >::iterator it(m_listGroup.begin());
it != m_listGroup.end();
++it) {
if (it->second != nullptr) {
std11::shared_ptr<river::io::Node> node = it->second->getNode(_name);
if (node != nullptr) {
RIVER_WARNING(" find it ... in group: " << it->first);
return node;
}
}
}
}
RIVER_WARNING("Create a new one : " << _name);
// check if the node can be open :
const std11::shared_ptr<const ejson::Object> tmpObject = m_config.getObject(_name);
if (tmpObject != nullptr) {
//Check if it is in a group:
std::string groupName = tmpObject->getStringValue("group", "");
// get type : io
std::string ioType = tmpObject->getStringValue("io", "error");
#ifdef __AIRTAUDIO_INFERFACE__
if ( ioType == "input"
|| ioType == "output") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodeAirTAudio::create(_name, tmpObject);
m_list.push_back(tmp);
return tmp;
} else
#endif
#ifdef __PORTAUDIO_INFERFACE__
if ( ioType == "PAinput"
|| ioType == "PAoutput") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodePortAudio::create(_name, tmpObject);
m_list.push_back(tmp);
return tmp;
} else
#endif
if (ioType == "aec") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodeAEC::create(_name, tmpObject);
m_list.push_back(tmp);
return tmp;
if ( groupName != ""
&& ( ioType == "input"
|| ioType == "output"
|| ioType == "PAinput"
|| ioType == "PAoutput") ) {
std11::shared_ptr<river::io::Group> tmpGroup = getGroup(groupName);
if (tmpGroup == nullptr) {
RIVER_WARNING("Can not get group ... '" << groupName << "'");
return std11::shared_ptr<river::io::Node>();
}
return tmpGroup->getNode(_name);
} else {
if (groupName != "") {
RIVER_WARNING("Group is only availlable for Hardware interface ... '" << _name << "'");
}
// TODO : Create a standalone group for every single element ==> simplify understanding ... but not for virtual interface ...
#ifdef __AIRTAUDIO_INFERFACE__
if ( ioType == "input"
|| ioType == "output") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodeAirTAudio::create(_name, tmpObject);
m_list.push_back(tmp);
return tmp;
}
#endif
#ifdef __PORTAUDIO_INFERFACE__
if ( ioType == "PAinput"
|| ioType == "PAoutput") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodePortAudio::create(_name, tmpObject);
m_list.push_back(tmp);
return tmp;
}
#endif
if (ioType == "aec") {
std11::shared_ptr<river::io::Node> tmp = river::io::NodeAEC::create(_name, tmpObject);
m_list.push_back(tmp);
return tmp;
}
}
}
RIVER_ERROR("Can not create the interface : '" << _name << "' the node is not DEFINED in the configuration file availlable : " << m_config.getKeys());
@ -196,3 +233,24 @@ void river::io::Manager::generateDot(const std::string& _filename) {
node.fileClose();
RIVER_INFO("Generate the DOT files: " << node << " (DONE)");
}
std11::shared_ptr<river::io::Group> river::io::Manager::getGroup(const std::string& _name) {
std11::shared_ptr<river::io::Group> out;
std::map<std::string, std11::shared_ptr<river::io::Group> >::iterator it = m_listGroup.find(_name);
if (it == m_listGroup.end()) {
RIVER_INFO("Create a new group: " << _name << " (START)");
out = std11::make_shared<river::io::Group>();
if (out != nullptr) {
out->createFrom(m_config, _name);
std::pair<std::string, std11::shared_ptr<river::io::Group> > plop(std::string(_name), out);
m_listGroup.insert(plop);
RIVER_INFO("Create a new group: " << _name << " ( END )");
} else {
RIVER_ERROR("Can not create new group: " << _name << " ( END )");
}
} else {
out = it->second;
}
return out;
}

View File

@ -9,6 +9,7 @@
#include <string>
#include <vector>
#include <map>
#include <list>
#include <stdint.h>
#include <etk/chrono.h>
@ -18,6 +19,7 @@
#include <audio/channel.h>
#include <ejson/ejson.h>
#include <drain/Volume.h>
#include <river/io/Group.h>
namespace river {
namespace io {
@ -73,6 +75,10 @@ namespace river {
* @param[in] _filename Name of the file to write data.
*/
virtual void generateDot(const std::string& _filename);
private:
std::map<std::string, std11::shared_ptr<river::io::Group> > m_listGroup; //!< List of all groups
std11::shared_ptr<river::io::Group> getGroup(const std::string& _name);
};
}
}

View File

@ -93,19 +93,6 @@ river::io::Node::Node(const std::string& _name, const std11::shared_ptr<const ej
m_process.setInputConfig(interfaceFormat);
}
//m_process.updateInterAlgo();
std::string linkWith = m_config->getStringValue("hw-link", "");
if (linkWith != "") {
std11::shared_ptr<Manager> mng = river::io::Manager::getInstance();
if (mng == nullptr) {
return;
}
m_link = mng->getNode(linkWith);
if (m_link == nullptr) {
RIVER_ERROR("can not link 2 interfaces ...");
} else {
RIVER_INFO("******** REQUEST LINK interface ************");
}
}
}
river::io::Node::~Node() {
@ -150,7 +137,7 @@ void river::io::Node::interfaceAdd(const std11::shared_ptr<river::Interface>& _i
m_list.push_back(_interface);
}
if (m_list.size() == 1) {
start();
startInGroup();
}
}
@ -166,7 +153,7 @@ void river::io::Node::interfaceRemove(const std11::shared_ptr<river::Interface>&
}
}
if (m_list.size() == 0) {
stop();
stopInGroup();
}
return;
}
@ -225,7 +212,7 @@ int32_t river::io::Node::newOutput(void* _outputBuffer,
RIVER_VERBOSE(" IO name="<< m_list[iii]->getName());
// clear datas ...
memset(&outputTmp2[0], 0, sizeof(int32_t)*m_process.getInputConfig().getMap().size()*_nbChunk);
RIVER_VERBOSE(" request Data="<< _nbChunk);
RIVER_VERBOSE(" request Data="<< _nbChunk << " time=" << _time);
m_list[iii]->systemNeedOutputData(_time, &outputTmp2[0], _nbChunk, sizeof(int32_t)*m_process.getInputConfig().getMap().size());
RIVER_VERBOSE(" Mix it ...");
outputTmp = reinterpret_cast<const int32_t*>(&outputTmp2[0]);
@ -244,7 +231,7 @@ int32_t river::io::Node::newOutput(void* _outputBuffer,
if (m_list[iii]->getMode() != river::modeInterface_feedback) {
continue;
}
RIVER_VERBOSE(" IO name="<< m_list[iii]->getName() << " (feedback)");
RIVER_VERBOSE(" IO name="<< m_list[iii]->getName() << " (feedback) time=" << _time);
m_list[iii]->systemNewInputData(_time, _outputBuffer, _nbChunk);
}
RIVER_VERBOSE("data Output size request :" << _nbChunk << " [ END ]");
@ -333,3 +320,21 @@ void river::io::Node::generateDot(etk::FSNode& _node) {
}
}
void river::io::Node::startInGroup() {
std11::shared_ptr<river::io::Group> group = m_group.lock();
if (group != nullptr) {
group->start();
} else {
start();
}
}
void river::io::Node::stopInGroup() {
std11::shared_ptr<river::io::Group> group = m_group.lock();
if (group != nullptr) {
group->stop();
} else {
stop();
}
}

View File

@ -26,7 +26,9 @@
namespace river {
namespace io {
class Manager;
class Group;
class Node : public std11::enable_shared_from_this<Node> {
friend river::io::Group;
protected:
uint32_t m_uid; // uniqueNodeID
protected:
@ -63,12 +65,14 @@ namespace river {
protected:
std11::shared_ptr<drain::VolumeElement> m_volume; //!< if a volume is set it is set here ...
std11::shared_ptr<river::io::Node> m_link;
protected:
std::vector<std11::weak_ptr<river::Interface> > m_listAvaillable; //!< List of all interface that exist on this Node
std::vector<std11::shared_ptr<river::Interface> > m_list;
size_t getNumberOfInterface(enum river::modeInterface _interfaceType);
public:
size_t getNumberOfInterface() {
return m_list.size();
}
public:
void registerAsRemote(const std11::shared_ptr<river::Interface>& _interface);
void interfaceAdd(const std11::shared_ptr<river::Interface>& _interface);
@ -89,6 +93,14 @@ namespace river {
return !m_isInput;
}
protected:
std11::weak_ptr<river::io::Group> m_group;
public:
void setGroup(std11::shared_ptr<river::io::Group> _group) {
m_group = _group;
}
protected:
void startInGroup();
void stopInGroup();
virtual void start() = 0;
virtual void stop() = 0;
public:

View File

@ -246,7 +246,7 @@ void river::io::NodeAEC::stop() {
} \
if (pointerOnFile != nullptr) { \
fwrite((dataPointer), sizeof(type), (nbElement), pointerOnFile); \
fflush(pointerOnFile); \
/* fflush(pointerOnFile);*/ \
} \
}while(0)
@ -258,13 +258,14 @@ void river::io::NodeAEC::onDataReceivedMicrophone(const void* _data,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
RIVER_DEBUG("Microphone Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _format=" << _format << " freq=" << _frequency);
RIVER_DEBUG(" next=" << _time + std11::chrono::nanoseconds(_nbChunk*1000000000LL/int64_t(_frequency)) );
if (_format != audio::format_int16) {
RIVER_ERROR("call wrong type ... (need int16_t)");
}
// push data synchronize
std11::unique_lock<std11::mutex> lock(m_mutex);
m_bufferMicrophone.write(_data, _nbChunk, _time);
SAVE_FILE_MACRO(int16_t, "REC_Microphone.raw", _data, _nbChunk*_map.size());
//SAVE_FILE_MACRO(int16_t, "REC_Microphone.raw", _data, _nbChunk*_map.size());
process();
}
@ -275,13 +276,14 @@ void river::io::NodeAEC::onDataReceivedFeedBack(const void* _data,
uint32_t _frequency,
const std::vector<audio::channel>& _map) {
RIVER_DEBUG("FeedBack Time=" << _time << " _nbChunk=" << _nbChunk << " _map=" << _map << " _format=" << _format << " freq=" << _frequency);
RIVER_DEBUG(" next=" << _time + std11::chrono::nanoseconds(_nbChunk*1000000000LL/int64_t(_frequency)) );
if (_format != audio::format_int16) {
RIVER_ERROR("call wrong type ... (need int16_t)");
}
// push data synchronize
std11::unique_lock<std11::mutex> lock(m_mutex);
m_bufferFeedBack.write(_data, _nbChunk, _time);
SAVE_FILE_MACRO(int16_t, "REC_FeedBack.raw", _data, _nbChunk*_map.size());
//SAVE_FILE_MACRO(int16_t, "REC_FeedBack.raw", _data, _nbChunk*_map.size());
process();
}

View File

@ -218,8 +218,7 @@ river::io::NodeAirTAudio::NodeAirTAudio(const std::string& _name, const std11::s
params.nChannels = 2;
}
airtaudio::StreamOptions option;
etk::from_string(option.mode, m_config->getStringValue("timestamp-mode", "soft"));
etk::from_string(option.mode, tmpObject->getStringValue("timestamp-mode", "soft"));
m_rtaudioFrameSize = nbChunk;
RIVER_INFO("Open output stream nbChannels=" << params.nChannels);
@ -251,18 +250,6 @@ river::io::NodeAirTAudio::NodeAirTAudio(const std::string& _name, const std11::s
RIVER_ERROR("Create stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") << " can not create stream " << err);
}
m_process.updateInterAlgo();
// manage Link Between Nodes :
if (m_link != nullptr) {
RIVER_INFO("******** START LINK ************");
std11::shared_ptr<river::io::NodeAirTAudio> link = std11::dynamic_pointer_cast<river::io::NodeAirTAudio>(m_link);
if (link == nullptr) {
RIVER_ERROR("Can not link 2 Interface with not the same type (reserved for HW interface)");
return;
}
link->m_adac.isMasterOf(m_adac);
// TODO : Add return ...
RIVER_INFO("******** LINK might be done ************");
}
}
river::io::NodeAirTAudio::~NodeAirTAudio() {

View File

@ -14,7 +14,9 @@
namespace river {
namespace io {
class Manager;
class Group;
class NodeAirTAudio : public Node {
friend river::io::Group;
protected:
/**
* @brief Constructor