[DEV] add a status callback on interface

This commit is contained in:
Edouard DUPIN 2015-03-18 21:47:27 +01:00
parent 2ce64df396
commit fd39bbb62f
5 changed files with 129 additions and 24 deletions

View File

@ -101,3 +101,17 @@ size_t drain::Algo::needInputData(size_t _output) {
}
return input;
}
void drain::Algo::setStatusFunction(algoStatusFunction _newFunction) {
m_statusFunction = _newFunction;
}
void drain::Algo::generateStatus(const std::string& _status) {
if (m_statusFunction != nullptr) {
if (m_name.size() == 0) {
m_statusFunction(m_type, _status);
} else {
m_statusFunction(m_name, _status);
}
}
}

View File

@ -21,6 +21,7 @@
#include "debug.h"
namespace drain{
typedef std11::function<void (const std::string& _origin, const std::string& _status)> algoStatusFunction;
class Algo : public std11::enable_shared_from_this<Algo> {
private:
std::string m_name;
@ -37,8 +38,8 @@ namespace drain{
const std::string& getType() const {
return m_type;
}
void setType(const std::string& _name) {
m_type = _name;
void setType(const std::string& _type) {
m_type = _type;
}
private:
bool m_temporary;
@ -49,6 +50,12 @@ namespace drain{
bool getTemporary() const {
return m_temporary;
}
private:
algoStatusFunction m_statusFunction;
public:
void setStatusFunction(algoStatusFunction _newFunction);
protected:
void generateStatus(const std::string& _status);
protected:
std::vector<int8_t> m_outputData;
int8_t m_formatSize; //!< sample size

View File

@ -67,7 +67,7 @@ bool drain::EndPointWrite::process(std11::chrono::system_clock::time_point& _tim
}
// resize output buffer:
//DRAIN_INFO(" resize : " << (int32_t)m_formatSize << "*" << (int32_t)_inputNbChunk << "*" << (int32_t)m_outputMap.size());
m_outputData.resize(m_formatSize*_inputNbChunk*m_output.getMap().size(), 0);
m_outputData.resize(m_formatSize*_inputNbChunk*m_output.getMap().size());
// set output pointer:
_outputNbChunk = m_outputData.size()/(m_formatSize*m_output.getMap().size());
_output = &m_outputData[0];
@ -75,28 +75,33 @@ bool drain::EndPointWrite::process(std11::chrono::system_clock::time_point& _tim
// check if data in the tmpBuffer
if (m_buffer.getSize() == 0) {
DRAIN_WARNING("No data in the user buffer (write null data ... " << _outputNbChunk << " chunks)");
// clear the buffer to force the flush on the next elements ...
m_outputData.clear();
_outputNbChunk = 0;
generateStatus("EPW_UNDERFLOW");
// just send no data ...
return true;
}
DRAIN_INFO("Write " << _outputNbChunk << " chunks");
DRAIN_VERBOSE("Write " << _outputNbChunk << " chunks");
// check if we have enought data:
int32_t nbChunkToCopy = std::min(_inputNbChunk, m_buffer.getSize());
DRAIN_INFO(" " << nbChunkToCopy << " chunks ==> " << nbChunkToCopy*m_output.getMap().size()*m_formatSize << " Byte sizeBuffer=" << m_buffer.getSize());
if (nbChunkToCopy != _inputNbChunk) {
generateStatus("EPW_UNDERFLOW");
}
DRAIN_VERBOSE(" " << nbChunkToCopy << " chunks ==> " << nbChunkToCopy*m_output.getMap().size()*m_formatSize << " Byte sizeBuffer=" << m_buffer.getSize());
_outputNbChunk = nbChunkToCopy;
// copy data to the output:
int32_t nbUnderflow = m_buffer.read(_output, nbChunkToCopy);
if (nbUnderflow != 0) {
DRAIN_WARNING("Undeflow in FIFO ...");
_outputNbChunk -= nbUnderflow;
}
return true;
}
void drain::EndPointWrite::write(const void* _value, size_t _nbChunk) {
std11::unique_lock<std11::mutex> lock(m_mutex);
DRAIN_INFO("[ASYNC] Write data : " << _nbChunk << " chunks"
<< " ==> " << _nbChunk*m_output.getMap().size() << " samples"
<< " formatSize=" << int32_t(m_formatSize)
<< " bufferSize=" << m_buffer.getSize());
DRAIN_VERBOSE("[ASYNC] Write data : " << _nbChunk << " chunks" << " ==> " << m_output);
int32_t nbOverflow = m_buffer.write(_value, _nbChunk);
if (nbOverflow > 0) {
DRAIN_ERROR("Overflow in output buffer : " << nbOverflow << " / " << _nbChunk);

View File

@ -43,7 +43,6 @@ bool drain::Process::pull(std11::chrono::system_clock::time_point& _time,
void* _data,
size_t _nbChunk,
size_t _chunkSize) {
//std::cout << " Interface DIRECT " << std::endl;
while(m_data.size()<_nbChunk*_chunkSize) {
void* in = NULL;
size_t nbChunkIn = _nbChunk - m_data.size()/_chunkSize;
@ -63,26 +62,21 @@ bool drain::Process::pull(std11::chrono::system_clock::time_point& _time,
}
// get data from the upstream
//std::cout << " * request " << nbChunkIn << " chunk" << std::endl;
process(_time, in, nbChunkIn, out, nbChunkOut);
//std::cout << " * get " << nbChunkOut << " chunk" << std::endl;
if (nbChunkOut > 0) {
size_t position = m_data.size();
m_data.resize(m_data.size() + nbChunkOut*_chunkSize);
memcpy(&m_data[position], out, nbChunkOut*_chunkSize);
} else {
// TODO : ERROR ...
// No more data in the process stream (0 input data might have flush data)
break;
}
}
if (m_data.size()>=_nbChunk*_chunkSize) {
//std::cout << " * copy needed data" << std::endl;
memcpy(_data, &m_data[0], _nbChunk*_chunkSize);
m_data.erase(m_data.begin(), m_data.begin()+_nbChunk*_chunkSize);
} else {
DRAIN_WARNING(" * soft underflow");
// ERROR
m_data.clear();
// copy only data availlable :
int32_t minByTeSize = std::min(m_data.size(), _nbChunk*_chunkSize);
if (minByTeSize >= 0) {
memcpy(_data, &m_data[0], minByTeSize);
m_data.erase(m_data.begin(), m_data.begin()+minByTeSize);
}
return true;
}
@ -113,11 +107,13 @@ bool drain::Process::process(std11::chrono::system_clock::time_point& _time,
void drain::Process::pushBack(const std11::shared_ptr<drain::Algo>& _algo) {
removeAlgoDynamic();
_algo->setStatusFunction(std11::bind(&drain::Process::generateStatus, this, std11::placeholders::_1, std11::placeholders::_2));
m_listAlgo.push_back(_algo);
}
void drain::Process::pushFront(const std11::shared_ptr<drain::Algo>& _algo) {
removeAlgoDynamic();
_algo->setStatusFunction(std11::bind(&drain::Process::generateStatus, this, std11::placeholders::_1, std11::placeholders::_2));
m_listAlgo.insert(m_listAlgo.begin(), _algo);
}
@ -347,6 +343,7 @@ void drain::Process::updateAlgo(size_t _position) {
algo->setInputFormat(out);
out.setFormat(audio::format_int16);
algo->setOutputFormat(out);
algo->setStatusFunction(std11::bind(&drain::Process::generateStatus, this, std11::placeholders::_1, std11::placeholders::_2));
m_listAlgo.insert(m_listAlgo.begin()+_position, algo);
DRAIN_VERBOSE("convert " << out.getFormat() << " -> " << in.getFormat());
_position++;
@ -357,6 +354,7 @@ void drain::Process::updateAlgo(size_t _position) {
algo->setInputFormat(out);
out.setFrequency(in.getFrequency());
algo->setOutputFormat(out);
algo->setStatusFunction(std11::bind(&drain::Process::generateStatus, this, std11::placeholders::_1, std11::placeholders::_2));
m_listAlgo.insert(m_listAlgo.begin()+_position, algo);
DRAIN_VERBOSE("convert " << out.getFrequency() << " -> " << in.getFrequency());
out.setFrequency(in.getFrequency());
@ -369,6 +367,7 @@ void drain::Process::updateAlgo(size_t _position) {
algo->setInputFormat(out);
out.setMap(in.getMap());
algo->setOutputFormat(out);
algo->setStatusFunction(std11::bind(&drain::Process::generateStatus, this, std11::placeholders::_1, std11::placeholders::_2));
m_listAlgo.insert(m_listAlgo.begin()+_position, algo);
DRAIN_VERBOSE("convert " << out.getMap() << " -> " << in.getMap());
_position++;
@ -380,6 +379,7 @@ void drain::Process::updateAlgo(size_t _position) {
algo->setInputFormat(out);
out.setFormat(in.getFormat());
algo->setOutputFormat(out);
algo->setStatusFunction(std11::bind(&drain::Process::generateStatus, this, std11::placeholders::_1, std11::placeholders::_2));
m_listAlgo.insert(m_listAlgo.begin()+_position, algo);
DRAIN_VERBOSE("convert " << out.getFormat() << " -> " << in.getFormat());
_position++;
@ -462,7 +462,7 @@ static void link(etk::FSNode& _node, const std::string& _first, const std::strin
void drain::Process::generateDot(etk::FSNode& _node, int32_t _offset, int32_t _basicID, std::string& _nameIn, std::string& _nameOut, bool _reserseGraph) {
_node << " subgraph clusterNode_" << _basicID << "_process {\n";
_node << " label=\"Drain::Process\";\n";
_node << " label=\"Drain::Process" << (_reserseGraph?"_R":"_N") << "\";\n";
_node << " node [shape=ellipse];\n";
if (_reserseGraph == false) {
@ -489,7 +489,8 @@ void drain::Process::generateDot(etk::FSNode& _node, int32_t _offset, int32_t _b
connectString = connectStringSecond;
}
} else {
for (int32_t iii=m_listAlgo.size()-1; iii>=0; --iii) {
//for (int32_t iii=m_listAlgo.size()-1; iii>=0; --iii) {
for (size_t iii=0; iii<m_listAlgo.size(); ++iii) {
if (m_listAlgo[iii] == nullptr) {
continue;
}
@ -519,3 +520,73 @@ void drain::Process::generateDot(etk::FSNode& _node, int32_t _offset, int32_t _b
_node << " }\n";
}
void drain::Process::generateDotProcess(etk::FSNode& _node, int32_t _offset, int32_t _basicID, std::string& _nameIn, std::string& _nameOut, bool _reserseGraph) {
_node << " subgraph clusterNode_" << _basicID << "_process {\n";
_node << " label=\"Drain::Process" << (_reserseGraph?"_R":"_N") << "\";\n";
_node << " node [shape=ellipse];\n";
if (_reserseGraph == true) {
_nameIn = "INTERFACE_ALGO_" + etk::to_string(_basicID) + "_in";
_node << " " << _nameIn << " [ label=\"format=" << etk::to_string(getInputConfig().getFormat())
<< "\\n freq=" << getInputConfig().getFrequency()
<< "\\n channelMap=" << etk::to_string(getInputConfig().getMap()) << "\\n in\" ];\n";
} else {
_nameIn = "INTERFACE_ALGO_" + etk::to_string(_basicID) + "_out";
_node << " " << _nameIn << " [ label=\"format=" << etk::to_string(getOutputConfig().getFormat())
<< "\\n freq=" << getOutputConfig().getFrequency()
<< "\\n channelMap=" << etk::to_string(getOutputConfig().getMap()) << "\\n out\" ];\n";
}
std::string connectString = _nameIn;
_node << " node [shape=box];\n";
if (_reserseGraph == false) {
for (size_t iii=0; iii<m_listAlgo.size(); ++iii) {
if (m_listAlgo[iii] == nullptr) {
continue;
}
std::string connectStringSecond = "ALGO_" + etk::to_string(_basicID) + "__" + etk::to_string(iii);
_node << " " << connectStringSecond << " [label=\"ALGO\\ntype='" << m_listAlgo[iii]->getType() << "'\\nname='" << m_listAlgo[iii]->getName() << "'\" ];\n";
link(_node, connectString, "->", connectStringSecond);
connectString = connectStringSecond;
}
} else {
//for (int32_t iii=m_listAlgo.size()-1; iii>=0; --iii) {
for (size_t iii=0; iii<m_listAlgo.size(); ++iii) {
if (m_listAlgo[iii] == nullptr) {
continue;
}
std::string connectStringSecond = "ALGO_" + etk::to_string(_basicID) + "__" + etk::to_string(iii);
_node << " " << connectStringSecond << " [label=\"ALGO\\ntype='" << m_listAlgo[iii]->getType() << "'\\nname='" << m_listAlgo[iii]->getName() << "'\" ];\n";
link(_node, connectStringSecond, "<-", connectString);
connectString = connectStringSecond;
}
}
_node << " node [shape=ellipse];\n";
if (_reserseGraph == true) {
_nameOut = "INTERFACE_ALGO_" + etk::to_string(_basicID) + "_out";
_node << " " << _nameOut << " [ label=\"format=" << etk::to_string(getOutputConfig().getFormat())
<< "\\n freq=" << getOutputConfig().getFrequency()
<< "\\n channelMap=" << etk::to_string(getOutputConfig().getMap()) << "\\n out\" ];\n";
} else {
_nameOut = "INTERFACE_ALGO_" + etk::to_string(_basicID) + "_in";
_node << " " << _nameOut << " [ label=\"format=" << etk::to_string(getInputConfig().getFormat())
<< "\\n freq=" << getInputConfig().getFrequency()
<< "\\n channelMap=" << etk::to_string(getInputConfig().getMap()) << "\\n in\" ];\n";
}
if (_reserseGraph == false) {
link(_node, connectString, "->", _nameOut);
} else {
link(_node, _nameOut, "<-", connectString);
}
_node << " }\n";
}
void drain::Process::generateStatus(const std::string& _origin, const std::string& _status) {
if (m_statusFunction != nullptr) {
m_statusFunction(_origin, _status);
}
}
void drain::Process::setStatusFunction(statusFunction _newFunction) {
m_statusFunction = _newFunction;
}

View File

@ -19,6 +19,7 @@
#include <etk/os/FSNode.h>
namespace drain{
typedef std11::function<void (const std::string& _origin, const std::string& _status)> statusFunction;
class Process {
protected:
std::vector<int8_t> m_data; //!< temporary overlap output buffer (change size of the output data)
@ -166,6 +167,11 @@ namespace drain{
}
return false;
}
private:
statusFunction m_statusFunction;
public:
void generateStatus(const std::string& _origin, const std::string& _status);
void setStatusFunction(statusFunction _newFunction);
private:
bool m_isConfigured;
public:
@ -176,6 +182,8 @@ namespace drain{
void updateAlgo(size_t _position);
public:
void generateDot(etk::FSNode& _node, int32_t _offset, int32_t _basicID, std::string& _nameIn, std::string& _nameOut, bool _reserseGraph);
// TODO : Remove this one when we find a good way to do it ...
void generateDotProcess(etk::FSNode& _node, int32_t _offset, int32_t _basicID, std::string& _nameIn, std::string& _nameOut, bool _reserseGraph);
};
};