[DEV] add a basic example on how to publish messga eon a stream and receive it ... ==> mut be doen proper now ...

This commit is contained in:
Edouard DUPIN 2015-03-18 22:06:45 +01:00
parent 4b704287b0
commit 3e0a5bb743
5 changed files with 227 additions and 94 deletions

View File

@ -44,7 +44,6 @@ add_service_files(
FILES
create.srv
remove.srv
write.srv
getBufferTime.srv
)

View File

@ -105,6 +105,198 @@ class InterfaceInput {
}
};
class InterfaceOutputStreamElement {
private:
int32_t m_id;
public:
int32_t getId() {
return m_id;
}
private:
int32_t m_nbConsecutiveUnderflow;
public:
int32_t getCountUnderflow() {
return m_nbConsecutiveUnderflow;
}
private:
std11::shared_ptr<river::Manager> m_manager;
std11::shared_ptr<river::Interface> m_interface;
std11::mutex m_mutex;
public:
InterfaceOutputStreamElement(const std11::shared_ptr<river::Manager>& _manager, int32_t _id) :
m_id(_id),
m_nbConsecutiveUnderflow(0),
m_manager(_manager) {
APPL_INFO("Create interface");
}
~InterfaceOutputStreamElement() {
std11::unique_lock<std::mutex> lock(m_mutex);
APPL_INFO("Remove interfaces (start)");
m_interface->stop();
m_interface.reset();
m_manager.reset();
APPL_INFO("Remove interfaces (done)");
}
void onTopicMessage(const std::string& _streamName, const audio_msg::AudioBuffer::ConstPtr& _msg) {
std11::unique_lock<std::mutex> lock(m_mutex);
if (m_interface != nullptr) {
APPL_VERBOSE("Write data : " << m_id << " size= " << _msg->data.size()/m_interface->getInterfaceFormat().getChunkSize());
m_interface->write(&_msg->data[0], _msg->data.size()/m_interface->getInterfaceFormat().getChunkSize());
m_nbConsecutiveUnderflow = 0;
return;
}
audio::format format = audio::convertFormat(_msg->channelFormat);
std::vector<enum audio::channel> map = audio::convertChannel(_msg->channelMap);
// no interface found => create a new one
m_interface = m_manager->createOutput(_msg->frequency,
map,
format,
_streamName);
if(m_interface == nullptr) {
APPL_ERROR("nullptr interface");
return;
}
m_interface->setReadwrite();
m_interface->setStatusFunction(std11::bind(&InterfaceOutputStreamElement::onStatus, this, std11::placeholders::_1, std11::placeholders::_2, _msg->sourceId));
m_interface->start();
m_interface->write(&_msg->data[0], _msg->data.size()/m_interface->getInterfaceFormat().getChunkSize());
}
void onStatus(const std::string& _origin, const std::string& _status, int32_t _iii) {
APPL_VERBOSE("status event : " << _origin << " status=" << _status << " on i=" << _iii);
m_nbConsecutiveUnderflow++;
}
};
class InterfaceOutputStreamManager {
private:
std::string m_name;
public:
const std::string& getName() {
return m_name;
}
private:
std11::shared_ptr<river::Manager> m_manager;
std::vector<std11::shared_ptr<InterfaceOutputStreamElement> > m_elementList;
std11::mutex m_mutex;
public:
InterfaceOutputStreamManager(const std::string& _name) :
m_name(_name) {
m_manager = river::Manager::create(m_name);
APPL_INFO("Create Manager : " << m_name);
}
~InterfaceOutputStreamManager() {
APPL_INFO("Remove Manager : " << m_name);
std11::unique_lock<std::mutex> lock(m_mutex);
APPL_INFO("Clean list");
m_elementList.clear();
m_manager.reset();
APPL_INFO("All is done ...");
}
void onTopicMessage(const std::string& _streamName, const audio_msg::AudioBuffer::ConstPtr& _msg) {
std11::unique_lock<std::mutex> lock(m_mutex);
for (size_t iii=0; iii<m_elementList.size(); ++iii) {
if (m_elementList[iii] == nullptr) {
continue;
}
if(m_elementList[iii]->getId() == _msg->sourceId) {
m_elementList[iii]->onTopicMessage(_streamName, _msg);
return;
}
}
// no interface found => create a new one
std11::shared_ptr<InterfaceOutputStreamElement> interface = std11::make_shared<InterfaceOutputStreamElement>(m_manager, _msg->sourceId);
if(interface == nullptr) {
APPL_ERROR("nullptr interface");
return;
}
m_elementList.push_back(interface);
interface->onTopicMessage(_streamName, _msg);
m_manager->generateDotAll("myDot.dot");
}
bool onTimer() {
std::vector<std11::shared_ptr<InterfaceOutputStreamElement> >::iterator it = m_elementList.begin();
bool oneElementRemoved = false;
while (it != m_elementList.end()) {
if (*it == nullptr) {
it = m_elementList.erase(it);
continue;
}
if ((*it)->getCountUnderflow() >= 50) {
(*it).reset();
oneElementRemoved = true;
it = m_elementList.erase(it);
continue;
}
++it;
}
if (oneElementRemoved == true) {
m_manager->generateDotAll("myDot.dot");
}
// return remove ...
return m_elementList.size() == 0;
}
};
class InterfaceOutputStream {
public:
std::string m_lowLevelStreamName;
ros::Subscriber m_stream;
ros::Timer m_timer;
std11::mutex m_mutex;
std::vector<std11::shared_ptr<InterfaceOutputStreamManager> > m_list;
public:
InterfaceOutputStream(const std::string& _input="speaker", const std::string& _publisher="speaker") :
m_lowLevelStreamName("speaker") {
ros::NodeHandle nodeHandlePrivate("~");
m_stream = nodeHandlePrivate.subscribe<audio_msg::AudioBuffer>(_publisher,
1000,
boost::bind(&InterfaceOutputStream::onTopicMessage, this, _1));
m_timer = nodeHandlePrivate.createTimer(ros::Duration(ros::Rate(4)), boost::bind(&InterfaceOutputStream::onTimer, this, _1));
}
~InterfaceOutputStream() {
std11::unique_lock<std::mutex> lock(m_mutex);
m_list.clear();
}
void onTopicMessage(const audio_msg::AudioBuffer::ConstPtr& _msg) {
std11::unique_lock<std::mutex> lock(m_mutex);
for (size_t iii=0; iii<m_list.size(); ++iii) {
if (m_list[iii] == nullptr) {
continue;
}
if (m_list[iii]->getName() == _msg->sourceName) {
APPL_VERBOSE("Write data : " << _msg->sourceName);
m_list[iii]->onTopicMessage(m_lowLevelStreamName, _msg);
return;
}
}
// new interface name:
std11::shared_ptr<InterfaceOutputStreamManager> newInterface = std11::make_shared<InterfaceOutputStreamManager>(_msg->sourceName);
if (newInterface == nullptr) {
APPL_ERROR("can not generate new interface element...");
return;
}
m_list.push_back(newInterface);
newInterface->onTopicMessage(m_lowLevelStreamName, _msg);
}
void onTimer(const ros::TimerEvent& _timer) {
std11::unique_lock<std::mutex> lock(m_mutex);
std::vector<std11::shared_ptr<InterfaceOutputStreamManager> >::iterator it = m_list.begin();
while (it != m_list.end()) {
if (*it == nullptr) {
it = m_list.erase(it);
continue;
}
if ((*it)->onTimer() == true) {
(*it).reset();
it = m_list.erase(it);
continue;
}
++it;
}
}
};
class InterfaceOutput {
public:
@ -214,34 +406,6 @@ bool f_remove(audio_core::remove::Request& _req,
return true;
}
bool f_write(audio_core::write::Request& _req,
audio_core::write::Response& _res) {
std11::shared_ptr<InterfaceOutput> interface;
// reset ouput
_res.waitTime = 0;
mutex.lock();
// Find the handle:
for(size_t iii=0; iii<g_listInterafceOut.size(); ++iii) {
if (g_listInterafceOut[iii] == nullptr) {
continue;
}
if (g_listInterafceOut[iii]->getId() == _req.handle) {
interface = g_listInterafceOut[iii];
break;
}
}
mutex.unlock();
if (interface == nullptr) {
APPL_ERROR("write : [" << _req.handle << "] Can not write ==> handle does not exist...");
return false;
}
APPL_INFO("write : [" << _req.handle << "] (start)");
_res.waitTime = interface->write(_req.data);
APPL_INFO("write : [" << _req.handle << "] (end)");
return true;
}
bool f_getBufferTime(audio_core::getBufferTime::Request& _req,
audio_core::getBufferTime::Response& _res) {
std11::shared_ptr<InterfaceOutput> interface;
@ -294,16 +458,15 @@ int main(int _argc, char **_argv) {
ros::ServiceServer serviceCreate = n.advertiseService("create", f_create);
ros::ServiceServer serviceRemove = n.advertiseService("remove", f_remove);
ros::ServiceServer serviceWrite = n.advertiseService("write", f_write);
ros::ServiceServer serviceGetBufferTime = n.advertiseService("getBufferTime", f_getBufferTime);
ros::NodeHandle nodeHandlePrivate("~");
g_manager = river::Manager::create("ROS node");
g_manager = river::Manager::create(n.getNamespace());
// start publishing of Microphone
std11::shared_ptr<InterfaceInput> m_input = std11::make_shared<InterfaceInput>(g_manager, "microphone", "microphone", false);
// start publishing of Speaker feedback
std11::shared_ptr<InterfaceInput> m_feedback = std11::make_shared<InterfaceInput>(g_manager, "speaker", "speaker", true);
std11::shared_ptr<InterfaceInput> m_feedback = std11::make_shared<InterfaceInput>(g_manager, "speaker", "feedback/speaker", true);
// create the Stream for output
std11::shared_ptr<InterfaceOutputStream> m_speaker = std11::make_shared<InterfaceOutputStream>("speaker", "speaker");
/*
* A count of how many messages we have sent. This is used to create

View File

@ -1,6 +0,0 @@
# audio interface UID
int64 handle
# interlaced data of the audio buffer
int8[] data
---
uint32 waitTime

View File

@ -36,7 +36,7 @@ Header header
# source
string sourceName
# source id
uint32 sourceId
int32 sourceId
# current frequency of the audio interface
uint16 frequency
# channel order of the current buffer

View File

@ -7,15 +7,6 @@
#include <sstream>
FILE* filee = NULL;
/**
* This tutorial demonstrates simple receipt of messages over the ROS system.
*/
void audioCallback(const audio_msg::AudioBuffer::ConstPtr& msg) {
fwrite(&msg->data[0] , sizeof(int16_t), msg->data.size(), filee);
ROS_INFO("get message: freq=%d nbChannel=%u nbSample=%ld", (int32_t)msg->frequency, (int32_t)msg->channelMap.size(), msg->data.size());
}
int64_t audioInterface_create(ros::NodeHandle& _n, ros::Time _timeUs, int32_t _freq, const std::vector<uint8_t> _channelMap) {
ros::ServiceClient client = _n.serviceClient<audio_core::create>("create");
audio_core::create srv;
@ -45,24 +36,6 @@ bool audioInterface_remove(ros::NodeHandle& _n, int64_t _uid) {
}
}
/**
* @brief return wait time
*/
uint32_t audioInterface_write(ros::NodeHandle& _n, int64_t _uid, const std::vector<int16_t>& _value) {
ros::ServiceClient client = _n.serviceClient<audio_core::write>("write");
audio_core::write srv;
srv.request.handle = _uid;
srv.request.data.resize(_value.size()*2);
memcpy(&srv.request.data[0], &_value[0], srv.request.data.size());
if (client.call(srv)) {
ROS_INFO("write need wait time : %d", srv.response.waitTime);
return srv.response.waitTime;
} else {
ROS_ERROR("Failed to call service write");
assert(0);
}
}
uint32_t audioInterface_getBufferTime(ros::NodeHandle& _n, int64_t _uid) {
ros::ServiceClient client = _n.serviceClient<audio_core::getBufferTime>("getBufferTime");
audio_core::getBufferTime srv;
@ -88,7 +61,7 @@ void usage() {
}
int main(int argc, char **argv) {
if (argc != 6 ) {
if (argc < 6 ) {
ROS_ERROR ("not enought argument : %d", argc);
usage();
}
@ -136,47 +109,51 @@ int main(int argc, char **argv) {
ROS_ERROR("nb chnnale supported error : %d not in [1,2,3,4]", p_nbChannels);
exit(-1);
}
// connect:
int64_t uid = audioInterface_create(n, timee, p_sampleRate, channelMap);
// new interface: just published data:
ros::Publisher stream;
ros::NodeHandle nodeHandle;
// create the output stream:
stream = nodeHandle.advertise<audio_msg::AudioBuffer>(p_channelToPlay, 100);
audio_msg::AudioBuffer msg;
// Basic source name is the curant node handle (it is unique)
msg.sourceName = ros::NodeHandle("~").getNamespace();
msg.sourceId = 0;
// create the Ros timestamp
msg.header.stamp = ros::Time::now();
// set message frequency
msg.frequency = p_sampleRate;
// set channel map properties
msg.channelMap = channelMap;
// Set the format of flow
msg.channelFormat = audio_msg::AudioBuffer::FORMAT_INT16;
std::vector<int16_t> data;
data.resize(baseDataSize*channelMap.size());
double baseCycle = 2.0*M_PI/(double)p_sampleRate * (double)p_frequency;
int32_t generateTime = (p_timeToPlay * p_sampleRate) / baseDataSize;
for (int32_t kkk=0; kkk<generateTime; ++kkk) {
for (int32_t iii=0; iii<data.size()/channelMap.size(); iii++) {
for (int32_t jjj=0; jjj<channelMap.size(); jjj++) {
data[channelMap.size()*iii+jjj] = cos(phase) * 30000;
data[channelMap.size()*iii+jjj] = cos(phase) * 15000;
}
phase += baseCycle;
if (phase >= 2*M_PI) {
phase -= 2*M_PI;
}
}
//ROS_INFO("send data");
int32_t needSleep = audioInterface_write(n, uid, data);
if (needSleep > 0) {
ROS_INFO("need sleep %d", needSleep);
usleep(needSleep);
} else {
ROS_INFO("not sleep");
// copy data:
msg.data.resize(data.size()*sizeof(int16_t));
memcpy(&msg.data[0], &data[0], data.size()*sizeof(int16_t));
// publish message
stream.publish(msg);
int32_t needSleep = (double(data.size()/channelMap.size()) / (double)p_sampleRate) * 1000000.0 * 0.97;
if (kkk >= 5) {
ROS_INFO_STREAM("need sleep " << needSleep << " µs for " << data.size()/channelMap.size() << " chunks");
usleep(needSleep);
}
}
// wait end if playing :
usleep(200000);
uint32_t waitTime = audioInterface_getBufferTime(n, uid);
while (waitTime>0) {
ROS_INFO("wait end of playing ... %u us", waitTime);
usleep(waitTime/2);
waitTime = audioInterface_getBufferTime(n, uid);
}
// close:
audioInterface_remove(n, uid);
return 0;
}