/** @file * @author Edouard DUPIN * @copyright 2015, Edouard DUPIN, all right reserved * @license APACHE v2.0 (see license file) */ #include "Node.h" #include #include #undef __class__ #define __class__ "io::Node" #ifndef INT16_MAX #define INT16_MAX 0x7fff #endif #ifndef INT16_MIN #define INT16_MIN (-INT16_MAX - 1) #endif #ifndef INT32_MAX #define INT32_MAX 0x7fffffffL #endif #ifndef INT32_MIN #define INT32_MIN (-INT32_MAX - 1L) #endif std::string asString(const std::chrono::system_clock::time_point& tp) { // convert to system time: std::time_t t = std::chrono::system_clock::to_time_t(tp); // convert in human string std::string ts = std::ctime(&t); // remove \n ts.resize(ts.size()-1); return ts; } namespace std { std::ostream& operator <<(std::ostream& _os, const std::chrono::system_clock::time_point& _obj) { std::chrono::microseconds us = std::chrono::duration_cast(_obj.time_since_epoch()); _os << us.count(); return _os; } } int32_t river::io::Node::airtAudioCallback(void* _outputBuffer, void* _inputBuffer, uint32_t _nbChunk, const std::chrono::system_clock::time_point& _time, airtaudio::status _status) { std::unique_lock lock(m_mutex); //RIVER_INFO("Time=" << _time); /* for (int32_t iii=0; iii<400; ++iii) { RIVER_VERBOSE("dummy=" << uint64_t(dummy[iii])); } */ if (_outputBuffer != nullptr) { RIVER_VERBOSE("data Output size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); std::vector output; RIVER_VERBOSE("resize=" << _nbChunk*m_process.getInputConfig().getMap().size()); output.resize(_nbChunk*m_process.getInputConfig().getMap().size(), 0); const int32_t* outputTmp = nullptr; std::vector outputTmp2; RIVER_VERBOSE("resize=" << sizeof(int32_t)*m_process.getInputConfig().getMap().size()*_nbChunk); outputTmp2.resize(sizeof(int32_t)*m_process.getInputConfig().getMap().size()*_nbChunk, 0); for (auto &it : m_list) { if (it == nullptr) { continue; } if (it->getMode() != river::modeInterface_output) { continue; } RIVER_VERBOSE(" IO name="<< it->getName()); // clear datas ... memset(&outputTmp2[0], 0, sizeof(int32_t)*m_process.getInputConfig().getMap().size()*_nbChunk); RIVER_VERBOSE(" request Data="<< _nbChunk); it->systemNeedOutputData(_time, &outputTmp2[0], _nbChunk, sizeof(int32_t)*m_process.getInputConfig().getMap().size()); RIVER_VERBOSE(" Mix it ..."); outputTmp = reinterpret_cast(&outputTmp2[0]); // Add data to the output tmp buffer : for (size_t kkk=0; kkkgetMode() != river::modeInterface_feedback) { continue; } RIVER_VERBOSE(" IO name="<< it->getName() << " (feedback)"); it->systemNewInputData(_time, _outputBuffer, _nbChunk); } RIVER_VERBOSE("data Output size request :" << _nbChunk << " [ END ]"); } if (_inputBuffer != nullptr) { RIVER_VERBOSE("data Input size request :" << _nbChunk << " [BEGIN] status=" << _status << " nbIO=" << m_list.size()); int16_t* inputBuffer = static_cast(_inputBuffer); for (auto &it : m_list) { if (it == nullptr) { continue; } if (it->getMode() != river::modeInterface_input) { continue; } RIVER_INFO(" IO name="<< it->getName()); it->systemNewInputData(_time, inputBuffer, _nbChunk); } RIVER_VERBOSE("data Input size request :" << _nbChunk << " [ END ]"); } return 0; } std::shared_ptr river::io::Node::create(const std::string& _name, const std::shared_ptr& _config) { return std::shared_ptr(new river::io::Node(_name, _config)); } river::io::Node::Node(const std::string& _name, const std::shared_ptr& _config) : m_config(_config), m_name(_name), m_isInput(false) { RIVER_INFO("-----------------------------------------------------------------"); RIVER_INFO("-- CREATE NODE --"); RIVER_INFO("-----------------------------------------------------------------"); drain::IOFormatInterface interfaceFormat; drain::IOFormatInterface hardwareFormat; /** io:"input", # input or output map-on:{ # select hardware interface and name interface:"alsa", # interface : "alsa", "pulse", "core", ... name:"default", # name of the interface }, frequency:48000, # frequency to open device channel-map:[ # mapping of the harware device (to change map if needed) "front-left", "front-right", "read-left", "rear-right", ], type:"int16", # format to open device (int8, int16, int16-on-ont32, int24, int32, float) nb-chunk:1024 # number of chunk to open device (create the latency anf the frequency to call user) */ m_isInput = m_config->getStringValue("io") == "input"; enum airtaudio::type typeInterface = airtaudio::type_undefined; std::string streamName = "default"; const std::shared_ptr tmpObject = m_config->getObject("map-on"); if (tmpObject == nullptr) { RIVER_WARNING("missing node : 'map-on' ==> auto map : 'auto:default'"); } else { std::string value = tmpObject->getStringValue("interface", "default"); typeInterface = airtaudio::getTypeFromString(value); streamName = tmpObject->getStringValue("name", "default"); } int32_t frequency = m_config->getNumberValue("frequency", 1); std::string type = m_config->getStringValue("type", "int16"); int32_t nbChunk = m_config->getNumberValue("nb-chunk", 1024); std::string volumeName = m_config->getStringValue("volume-name", ""); if (volumeName != "") { RIVER_INFO("add node volume stage : '" << volumeName << "'"); // use global manager for volume ... m_volume = river::io::Manager::getInstance()->getVolumeGroup(volumeName); } enum audio::format formatType = audio::getFormatFromString(type); // TODO : MAP ... // intanciate specific API ... m_adac.instanciate(typeInterface); // TODO : Check return ... if (streamName == "") { streamName = "default"; } std::vector map; // set default channel property : map.push_back(audio::channel_frontLeft); map.push_back(audio::channel_frontRight); hardwareFormat.set(map, formatType, frequency); // TODO : Better view of interface type float -> float, int16 -> int16/int32, ... if (m_isInput == true) { // for input we just transfert audio with no transformation interfaceFormat.set(map, audio::format_int16, frequency); } else { // for output we will do a mix ... interfaceFormat.set(map, audio::format_int16_on_int32, frequency); } // search device ID : RIVER_INFO("Open :"); RIVER_INFO(" m_streamName=" << streamName); RIVER_INFO(" m_freq=" << hardwareFormat.getFrequency()); RIVER_INFO(" m_map=" << hardwareFormat.getMap()); RIVER_INFO(" m_format=" << hardwareFormat.getFormat()); RIVER_INFO(" m_isInput=" << m_isInput); int32_t deviceId = 0; RIVER_INFO("Device list:"); for (int32_t iii=0; iii lock(m_mutex); RIVER_INFO("-----------------------------------------------------------------"); RIVER_INFO("-- DESTROY NODE --"); RIVER_INFO("-----------------------------------------------------------------"); RIVER_INFO("close input stream"); if (m_adac.isStreamOpen() ) { m_adac.closeStream(); } }; void river::io::Node::start() { std::unique_lock lock(m_mutex); RIVER_INFO("Start stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") ); enum airtaudio::error err = m_adac.startStream(); if (err != airtaudio::error_none) { RIVER_ERROR("Start stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") << " can not start stream ... " << err); } } void river::io::Node::stop() { std::unique_lock lock(m_mutex); RIVER_INFO("Stop stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") ); enum airtaudio::error err = m_adac.stopStream(); if (err != airtaudio::error_none) { RIVER_ERROR("Stop stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") << " can not stop stream ... " << err); } } void river::io::Node::registerAsRemote(const std::shared_ptr& _interface) { auto it = m_listAvaillable.begin(); while (it != m_listAvaillable.end()) { if (it->expired() == true) { it = m_listAvaillable.erase(it); } ++it; } m_listAvaillable.push_back(_interface); } void river::io::Node::interfaceAdd(const std::shared_ptr& _interface) { { std::unique_lock lock(m_mutex); for (size_t iii=0; iii< m_list.size(); ++iii) { if (_interface == m_list[iii]) { return; } } RIVER_INFO("ADD interface for stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") ); m_list.push_back(_interface); } if (m_list.size() == 1) { start(); } } void river::io::Node::interfaceRemove(const std::shared_ptr& _interface) { { std::unique_lock lock(m_mutex); for (size_t iii=0; iii< m_list.size(); ++iii) { if (_interface == m_list[iii]) { m_list.erase(m_list.begin()+iii); RIVER_INFO("RM interface for stream : '" << m_name << "' mode=" << (m_isInput?"input":"output") ); break; } } } if (m_list.size() == 0) { stop(); } return; } void river::io::Node::volumeChange() { for (auto &it : m_listAvaillable) { std::shared_ptr node = it.lock(); if (node != nullptr) { node->systemVolumeChange(); } } }