diff --git a/webrtc/modules/audio_device/test/audio_device_test_api.cc b/webrtc/modules/audio_device/test/audio_device_test_api.cc index b4b8235d0..09f70bd85 100644 --- a/webrtc/modules/audio_device/test/audio_device_test_api.cc +++ b/webrtc/modules/audio_device/test/audio_device_test_api.cc @@ -166,7 +166,7 @@ class AudioDeviceAPITest: public testing::Test { virtual ~AudioDeviceAPITest() {} static void SetUpTestCase() { - process_thread_ = ProcessThread::CreateProcessThread(); + process_thread_ = ProcessThread::Create(); process_thread_->Start(); // Windows: @@ -274,7 +274,7 @@ class AudioDeviceAPITest: public testing::Test { if (process_thread_) { process_thread_->DeRegisterModule(audio_device_); process_thread_->Stop(); - ProcessThread::DestroyProcessThread(process_thread_); + process_thread_.reset(); } if (event_observer_) { delete event_observer_; @@ -324,8 +324,9 @@ class AudioDeviceAPITest: public testing::Test { EXPECT_FALSE(audio_device_->MicrophoneIsInitialized()); } + // TODO(henrika): Get rid of globals. static bool linux_alsa_; - static ProcessThread* process_thread_; + static rtc::scoped_ptr process_thread_; static AudioDeviceModule* audio_device_; static AudioTransportAPI* audio_transport_; static AudioEventObserverAPI* event_observer_; @@ -333,7 +334,7 @@ class AudioDeviceAPITest: public testing::Test { // Must be initialized like this to handle static SetUpTestCase() above. bool AudioDeviceAPITest::linux_alsa_ = false; -ProcessThread* AudioDeviceAPITest::process_thread_ = NULL; +rtc::scoped_ptr AudioDeviceAPITest::process_thread_; AudioDeviceModule* AudioDeviceAPITest::audio_device_ = NULL; AudioTransportAPI* AudioDeviceAPITest::audio_transport_ = NULL; AudioEventObserverAPI* AudioDeviceAPITest::event_observer_ = NULL; diff --git a/webrtc/modules/audio_device/test/func_test_manager.cc b/webrtc/modules/audio_device/test/func_test_manager.cc index 3599873f3..d6e5cb3d9 100644 --- a/webrtc/modules/audio_device/test/func_test_manager.cc +++ b/webrtc/modules/audio_device/test/func_test_manager.cc @@ -558,7 +558,6 @@ void AudioTransportImpl::PullRenderData(int bits_per_sample, int sample_rate, int64_t* ntp_time_ms) {} FuncTestManager::FuncTestManager() : - _processThread(NULL), _audioDevice(NULL), _audioEventObserver(NULL), _audioTransport(NULL) @@ -579,7 +578,7 @@ FuncTestManager::~FuncTestManager() int32_t FuncTestManager::Init() { - EXPECT_TRUE((_processThread = ProcessThread::CreateProcessThread()) != NULL); + EXPECT_TRUE((_processThread = ProcessThread::Create()) != NULL); if (_processThread == NULL) { return -1; @@ -620,7 +619,7 @@ int32_t FuncTestManager::Close() { _processThread->DeRegisterModule(_audioDevice); _processThread->Stop(); - ProcessThread::DestroyProcessThread(_processThread); + _processThread.reset(); } // delete the audio observer @@ -789,7 +788,7 @@ int32_t FuncTestManager::TestAudioLayerSelection() { _processThread->DeRegisterModule(_audioDevice); _processThread->Stop(); - ProcessThread::DestroyProcessThread(_processThread); + _processThread.reset(); } // delete the audio observer @@ -816,7 +815,7 @@ int32_t FuncTestManager::TestAudioLayerSelection() // ================================================== // Next, try to make fresh start with new audio layer - EXPECT_TRUE((_processThread = ProcessThread::CreateProcessThread()) != NULL); + EXPECT_TRUE((_processThread = ProcessThread::Create()) != NULL); if (_processThread == NULL) { return -1; diff --git a/webrtc/modules/audio_device/test/func_test_manager.h b/webrtc/modules/audio_device/test/func_test_manager.h index 5cb4f4610..3c80cdc4a 100644 --- a/webrtc/modules/audio_device/test/func_test_manager.h +++ b/webrtc/modules/audio_device/test/func_test_manager.h @@ -245,7 +245,7 @@ private: std::string _playoutFile16; std::string _playoutFile8; - ProcessThread* _processThread; + rtc::scoped_ptr _processThread; AudioDeviceModule* _audioDevice; AudioEventObserver* _audioEventObserver; AudioTransportImpl* _audioTransport; diff --git a/webrtc/modules/interface/module.h b/webrtc/modules/interface/module.h index c3e303039..dfa1ad4e7 100644 --- a/webrtc/modules/interface/module.h +++ b/webrtc/modules/interface/module.h @@ -21,6 +21,11 @@ class Module { // thread to call Process. // This method is called on the same worker thread as Process will // be called on. + // TODO(tommi): Almost all implementations of this function, need to know + // the current tick count. Consider passing it as an argument. It could + // also improve the accuracy of when the next callback occurs since the + // thread that calls Process() will also have it's tick count reference + // which might not match with what the implementations use. virtual int64_t TimeUntilNextProcess() = 0; // Process any pending tasks such as timeouts. diff --git a/webrtc/modules/modules.gyp b/webrtc/modules/modules.gyp index 376e6c5b4..4a92d21f3 100644 --- a/webrtc/modules/modules.gyp +++ b/webrtc/modules/modules.gyp @@ -254,6 +254,7 @@ 'rtp_rtcp/test/testAPI/test_api_video.cc', 'utility/source/audio_frame_operations_unittest.cc', 'utility/source/file_player_unittests.cc', + 'utility/source/process_thread_impl_unittest.cc', 'video_coding/codecs/test/packet_manipulator_unittest.cc', 'video_coding/codecs/test/stats_unittest.cc', 'video_coding/codecs/test/videoprocessor_unittest.cc', diff --git a/webrtc/modules/utility/interface/mock/mock_process_thread.h b/webrtc/modules/utility/interface/mock/mock_process_thread.h index fc0c1fb1c..a181812f9 100644 --- a/webrtc/modules/utility/interface/mock/mock_process_thread.h +++ b/webrtc/modules/utility/interface/mock/mock_process_thread.h @@ -21,6 +21,7 @@ class MockProcessThread : public ProcessThread { public: MOCK_METHOD0(Start, int32_t()); MOCK_METHOD0(Stop, int32_t()); + MOCK_METHOD1(WakeUp, void(Module* module)); MOCK_METHOD1(RegisterModule, int32_t(Module* module)); MOCK_METHOD1(DeRegisterModule, int32_t(const Module* module)); }; diff --git a/webrtc/modules/utility/interface/process_thread.h b/webrtc/modules/utility/interface/process_thread.h index 4db92a308..2262f2d7a 100644 --- a/webrtc/modules/utility/interface/process_thread.h +++ b/webrtc/modules/utility/interface/process_thread.h @@ -12,23 +12,39 @@ #define WEBRTC_MODULES_UTILITY_INTERFACE_PROCESS_THREAD_H_ #include "webrtc/typedefs.h" +#include "webrtc/base/scoped_ptr.h" namespace webrtc { class Module; -class ProcessThread -{ -public: - static ProcessThread* CreateProcessThread(); - static void DestroyProcessThread(ProcessThread* module); +class ProcessThread { + public: + virtual ~ProcessThread(); - virtual int32_t Start() = 0; - virtual int32_t Stop() = 0; + static rtc::scoped_ptr Create(); - virtual int32_t RegisterModule(Module* module) = 0; - virtual int32_t DeRegisterModule(const Module* module) = 0; -protected: - virtual ~ProcessThread(); + // Starts the worker thread. Must be called from the construction thread. + virtual int32_t Start() = 0; + + // Stops the worker thread. Must be called from the construction thread. + virtual int32_t Stop() = 0; + + // Wakes the thread up to give a module a chance to do processing right + // away. This causes the worker thread to wake up and requery the specified + // module for when it should be called back. (Typically the module should + // return 0 from TimeUntilNextProcess on the worker thread at that point). + // Can be called on any thread. + virtual void WakeUp(Module* module) = 0; + + // Adds a module that will start to receive callbacks on the worker thread. + // Can be called from any thread. + virtual int32_t RegisterModule(Module* module) = 0; + + // Removes a previously registered module. + // Can be called from any thread. + virtual int32_t DeRegisterModule(const Module* module) = 0; }; + } // namespace webrtc + #endif // WEBRTC_MODULES_UTILITY_INTERFACE_PROCESS_THREAD_H_ diff --git a/webrtc/modules/utility/source/process_thread_impl.cc b/webrtc/modules/utility/source/process_thread_impl.cc index e2e9542c7..9b13daa2b 100644 --- a/webrtc/modules/utility/source/process_thread_impl.cc +++ b/webrtc/modules/utility/source/process_thread_impl.cc @@ -8,163 +8,161 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "webrtc/modules/interface/module.h" #include "webrtc/modules/utility/source/process_thread_impl.h" +#include "webrtc/base/checks.h" +#include "webrtc/modules/interface/module.h" +#include "webrtc/system_wrappers/interface/logging.h" +#include "webrtc/system_wrappers/interface/tick_util.h" namespace webrtc { -ProcessThread::~ProcessThread() -{ +namespace { +int64_t GetNextCallbackTime(Module* module, int64_t time_now) { + int64_t interval = module->TimeUntilNextProcess(); + // Currently some implementations erroneously return error codes from + // TimeUntilNextProcess(). So, as is, we correct that and log an error. + if (interval < 0) { + LOG(LS_ERROR) << "TimeUntilNextProcess returned an invalid value " + << interval; + interval = 0; + } + return time_now + interval; +} } -ProcessThread* ProcessThread::CreateProcessThread() -{ - return new ProcessThreadImpl(); -} +ProcessThread::~ProcessThread() {} -void ProcessThread::DestroyProcessThread(ProcessThread* module) -{ - delete module; +// static +rtc::scoped_ptr ProcessThread::Create() { + return rtc::scoped_ptr(new ProcessThreadImpl()).Pass(); } ProcessThreadImpl::ProcessThreadImpl() - : _timeEvent(*EventWrapper::Create()), - _critSectModules(CriticalSectionWrapper::CreateCriticalSection()), - _thread(NULL) -{ + : wake_up_(EventWrapper::Create()), stop_(false) { } -ProcessThreadImpl::~ProcessThreadImpl() -{ - delete _critSectModules; - delete &_timeEvent; +ProcessThreadImpl::~ProcessThreadImpl() { + DCHECK(thread_checker_.CalledOnValidThread()); + DCHECK(!thread_.get()); + DCHECK(!stop_); } -int32_t ProcessThreadImpl::Start() -{ - CriticalSectionScoped lock(_critSectModules); - if(_thread) - { - return -1; - } - _thread = ThreadWrapper::CreateThread(Run, this, kNormalPriority, - "ProcessThread"); - unsigned int id; - int32_t retVal = _thread->Start(id); - if(retVal >= 0) - { - return 0; - } - delete _thread; - _thread = NULL; +int32_t ProcessThreadImpl::Start() { + DCHECK(thread_checker_.CalledOnValidThread()); + if (thread_.get()) return -1; + + DCHECK(!stop_); + + thread_.reset(ThreadWrapper::CreateThread( + &ProcessThreadImpl::Run, this, kNormalPriority, "ProcessThread")); + unsigned int id; + if (!thread_->Start(id)) { + thread_.reset(); + return -1; + } + + return 0; } -int32_t ProcessThreadImpl::Stop() -{ - _critSectModules->Enter(); - if(_thread) - { - ThreadWrapper* thread = _thread; - _thread = NULL; - - _timeEvent.Set(); - _critSectModules->Leave(); - - if(thread->Stop()) - { - delete thread; - } else { - return -1; - } - } else { - _critSectModules->Leave(); - } +int32_t ProcessThreadImpl::Stop() { + DCHECK(thread_checker_.CalledOnValidThread()); + if(!thread_.get()) return 0; + + { + rtc::CritScope lock(&lock_); + stop_ = true; + } + + wake_up_->Set(); + + thread_->Stop(); + thread_.reset(); + stop_ = false; + + return 0; } -int32_t ProcessThreadImpl::RegisterModule(Module* module) -{ - CriticalSectionScoped lock(_critSectModules); +void ProcessThreadImpl::WakeUp(Module* module) { + // Allowed to be called on any thread. + { + rtc::CritScope lock(&lock_); + ModuleCallback cb(module); + const auto& found = std::find(modules_.begin(), modules_.end(), cb); + DCHECK(found != modules_.end()) << "programmer error?"; + (*found).next_callback = 0; + } + wake_up_->Set(); +} + +int32_t ProcessThreadImpl::RegisterModule(Module* module) { + // Allowed to be called on any thread. + { + rtc::CritScope lock(&lock_); // Only allow module to be registered once. - for (ModuleList::iterator iter = _modules.begin(); - iter != _modules.end(); ++iter) { - if(module == *iter) - { - return -1; - } - } + ModuleCallback cb(module); + if (std::find(modules_.begin(), modules_.end(), cb) != modules_.end()) + return -1; + modules_.push_front(cb); + } - _modules.push_front(module); + // Wake the thread calling ProcessThreadImpl::Process() to update the + // waiting time. The waiting time for the just registered module may be + // shorter than all other registered modules. + wake_up_->Set(); - // Wake the thread calling ProcessThreadImpl::Process() to update the - // waiting time. The waiting time for the just registered module may be - // shorter than all other registered modules. - _timeEvent.Set(); - return 0; + return 0; } -int32_t ProcessThreadImpl::DeRegisterModule(const Module* module) -{ - CriticalSectionScoped lock(_critSectModules); - for (ModuleList::iterator iter = _modules.begin(); - iter != _modules.end(); ++iter) { - if(module == *iter) - { - _modules.erase(iter); - return 0; - } - } - return -1; +int32_t ProcessThreadImpl::DeRegisterModule(const Module* module) { + // Allowed to be called on any thread. + rtc::CritScope lock(&lock_); + modules_.remove_if([&module](const ModuleCallback& m) { + return m.module == module; + }); + return 0; } -bool ProcessThreadImpl::Run(void* obj) -{ - return static_cast(obj)->Process(); +// static +bool ProcessThreadImpl::Run(void* obj) { + return static_cast(obj)->Process(); } -bool ProcessThreadImpl::Process() -{ - // Wait for the module that should be called next, but don't block thread - // longer than 100 ms. - int64_t minTimeToNext = 100; - { - CriticalSectionScoped lock(_critSectModules); - for (ModuleList::iterator iter = _modules.begin(); - iter != _modules.end(); ++iter) { - int64_t timeToNext = (*iter)->TimeUntilNextProcess(); - if(minTimeToNext > timeToNext) - { - minTimeToNext = timeToNext; - } - } - } +bool ProcessThreadImpl::Process() { + int64_t now = TickTime::MillisecondTimestamp(); + int64_t next_checkpoint = now + (1000 * 60); + { + rtc::CritScope lock(&lock_); + if (stop_) + return false; + for (auto& m : modules_) { + // TODO(tommi): Would be good to measure the time TimeUntilNextProcess + // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this + // operation should not require taking a lock, so querying all modules + // should run in a matter of nanoseconds. + if (m.next_callback == 0) + m.next_callback = GetNextCallbackTime(m.module, now); - if(minTimeToNext > 0) - { - if(kEventError == - _timeEvent.Wait(static_cast(minTimeToNext))) - { - return true; - } - CriticalSectionScoped lock(_critSectModules); - if(!_thread) - { - return false; - } + if (m.next_callback <= now) { + m.module->Process(); + // Use a new 'now' reference to calculate when the next callback + // should occur. We'll continue to use 'now' above for the baseline + // of calculating how long we should wait, to reduce variance. + auto new_now = TickTime::MillisecondTimestamp(); + m.next_callback = GetNextCallbackTime(m.module, new_now); + } + + if (m.next_callback < next_checkpoint) + next_checkpoint = m.next_callback; } - { - CriticalSectionScoped lock(_critSectModules); - for (ModuleList::iterator iter = _modules.begin(); - iter != _modules.end(); ++iter) { - int64_t timeToNext = (*iter)->TimeUntilNextProcess(); - if(timeToNext < 1) - { - (*iter)->Process(); - } - } - } - return true; + } + + auto time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp(); + if (time_to_wait > 0) + wake_up_->Wait(static_cast(time_to_wait)); + + return true; } } // namespace webrtc diff --git a/webrtc/modules/utility/source/process_thread_impl.h b/webrtc/modules/utility/source/process_thread_impl.h index 14fbc18a2..5a42140fa 100644 --- a/webrtc/modules/utility/source/process_thread_impl.h +++ b/webrtc/modules/utility/source/process_thread_impl.h @@ -13,37 +13,52 @@ #include +#include "webrtc/base/criticalsection.h" +#include "webrtc/base/thread_checker.h" #include "webrtc/modules/utility/interface/process_thread.h" -#include "webrtc/system_wrappers/interface/critical_section_wrapper.h" #include "webrtc/system_wrappers/interface/event_wrapper.h" #include "webrtc/system_wrappers/interface/thread_wrapper.h" #include "webrtc/typedefs.h" namespace webrtc { -class ProcessThreadImpl : public ProcessThread -{ -public: - ProcessThreadImpl(); - virtual ~ProcessThreadImpl(); - virtual int32_t Start(); - virtual int32_t Stop(); +class ProcessThreadImpl : public ProcessThread { + public: + ProcessThreadImpl(); + ~ProcessThreadImpl() override; - virtual int32_t RegisterModule(Module* module); - virtual int32_t DeRegisterModule(const Module* module); + int32_t Start() override; + int32_t Stop() override; -protected: - static bool Run(void* obj); + void WakeUp(Module* module) override; - bool Process(); + int32_t RegisterModule(Module* module); + int32_t DeRegisterModule(const Module* module); -private: - typedef std::list ModuleList; - EventWrapper& _timeEvent; - CriticalSectionWrapper* _critSectModules; - ModuleList _modules; - ThreadWrapper* _thread; + protected: + static bool Run(void* obj); + bool Process(); + + private: + rtc::ThreadChecker thread_checker_; + const rtc::scoped_ptr wake_up_; + rtc::scoped_ptr thread_; + + struct ModuleCallback { + ModuleCallback(Module* module) : module(module), next_callback(0) {} + bool operator==(const ModuleCallback& cb) const { + return cb.module == module; + } + Module* const module; + int64_t next_callback; // Absolute timestamp. + }; + + rtc::CriticalSection lock_; // Used to guard modules_ and stop_. + typedef std::list ModuleList; + ModuleList modules_; + bool stop_; }; + } // namespace webrtc #endif // WEBRTC_MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_ diff --git a/webrtc/modules/utility/source/process_thread_impl_unittest.cc b/webrtc/modules/utility/source/process_thread_impl_unittest.cc new file mode 100644 index 000000000..883554151 --- /dev/null +++ b/webrtc/modules/utility/source/process_thread_impl_unittest.cc @@ -0,0 +1,245 @@ +/* + * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#include "testing/gmock/include/gmock/gmock.h" +#include "testing/gtest/include/gtest/gtest.h" +#include "webrtc/modules/interface/module.h" +#include "webrtc/modules/utility/source/process_thread_impl.h" +#include "webrtc/system_wrappers/interface/tick_util.h" + +namespace webrtc { + +using ::testing::_; +using ::testing::DoAll; +using ::testing::InSequence; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::SetArgPointee; + +class MockModule : public Module { + public: + MOCK_METHOD0(TimeUntilNextProcess, int64_t()); + MOCK_METHOD0(Process, int32_t()); +}; + +ACTION_P(SetEvent, event) { + event->Set(); +} + +ACTION_P(Increment, counter) { + ++(*counter); +} + +ACTION_P(SetTimestamp, ptr) { + *ptr = TickTime::MillisecondTimestamp(); +} + +TEST(ProcessThreadImpl, StartStop) { + ProcessThreadImpl thread; + EXPECT_EQ(0, thread.Start()); + EXPECT_EQ(0, thread.Stop()); +} + +TEST(ProcessThreadImpl, MultipleStartStop) { + ProcessThreadImpl thread; + for (int i = 0; i < 5; ++i) { + EXPECT_EQ(0, thread.Start()); + EXPECT_EQ(0, thread.Stop()); + } +} + +// Verifies that we get at least call back to Process() on the worker thread. +TEST(ProcessThreadImpl, ProcessCall) { + ProcessThreadImpl thread; + ASSERT_EQ(0, thread.Start()); + + rtc::scoped_ptr event(EventWrapper::Create()); + + MockModule module; + EXPECT_CALL(module, TimeUntilNextProcess()).WillRepeatedly(Return(0)); + EXPECT_CALL(module, Process()) + .WillOnce(DoAll(SetEvent(event.get()), Return(0))) + .WillRepeatedly(Return(0)); + + ASSERT_EQ(0, thread.RegisterModule(&module)); + EXPECT_EQ(kEventSignaled, event->Wait(100)); + EXPECT_EQ(0, thread.Stop()); +} + +// Same as ProcessCall except the module is registered before the +// call to Start(). +TEST(ProcessThreadImpl, ProcessCall2) { + ProcessThreadImpl thread; + rtc::scoped_ptr event(EventWrapper::Create()); + + MockModule module; + EXPECT_CALL(module, TimeUntilNextProcess()).WillRepeatedly(Return(0)); + EXPECT_CALL(module, Process()) + .WillOnce(DoAll(SetEvent(event.get()), Return(0))) + .WillRepeatedly(Return(0)); + + ASSERT_EQ(0, thread.RegisterModule(&module)); + ASSERT_EQ(thread.Start(), 0); + EXPECT_EQ(kEventSignaled, event->Wait(100)); + EXPECT_EQ(thread.Stop(), 0); +} + +// Tests setting up a module for callbacks and then unregister that module. +// After unregistration, we should not receive any further callbacks. +TEST(ProcessThreadImpl, Deregister) { + ProcessThreadImpl thread; + rtc::scoped_ptr event(EventWrapper::Create()); + + int process_count = 0; + MockModule module; + EXPECT_CALL(module, TimeUntilNextProcess()).WillRepeatedly(Return(0)); + EXPECT_CALL(module, Process()) + .WillOnce(DoAll(SetEvent(event.get()), + Increment(&process_count), + Return(0))) + .WillRepeatedly(DoAll(Increment(&process_count), Return(0))); + + ASSERT_EQ(0, thread.RegisterModule(&module)); + ASSERT_EQ(0, thread.Start()); + + EXPECT_EQ(kEventSignaled, event->Wait(100)); + ASSERT_EQ(0, thread.DeRegisterModule(&module)); + EXPECT_GE(process_count, 1); + int count_after_deregister = process_count; + + // We shouldn't get any more callbacks. + EXPECT_EQ(kEventTimeout, event->Wait(20)); + EXPECT_EQ(count_after_deregister, process_count); + EXPECT_EQ(0, thread.Stop()); +} + +// Helper function for testing receiving a callback after a certain amount of +// time. There's some variance of timing built into it to reduce chance of +// flakiness on bots. +void ProcessCallAfterAFewMs(int64_t milliseconds) { + ProcessThreadImpl thread; + ASSERT_EQ(0, thread.Start()); + + rtc::scoped_ptr event(EventWrapper::Create()); + + MockModule module; + int64_t start_time = 0; + int64_t called_time = 0; + EXPECT_CALL(module, TimeUntilNextProcess()) + .WillOnce(DoAll(SetTimestamp(&start_time), + Return(milliseconds))) + .WillRepeatedly(Return(milliseconds)); + EXPECT_CALL(module, Process()) + .WillOnce(DoAll(SetTimestamp(&called_time), + SetEvent(event.get()), + Return(0))); + + EXPECT_EQ(0, thread.RegisterModule(&module)); + + // Add a buffer of 50ms due to slowness of some trybots + // (e.g. win_drmemory_light) + EXPECT_EQ(kEventSignaled, event->Wait(milliseconds + 50)); + ASSERT_EQ(0, thread.Stop()); + + ASSERT_GT(start_time, 0); + ASSERT_GT(called_time, 0); + // Use >= instead of > since due to rounding and timer accuracy (or lack + // thereof), can make the test run in "0"ms time. + EXPECT_GE(called_time, start_time); + // Check for an acceptable range. + uint32 diff = called_time - start_time; + EXPECT_GE(diff, milliseconds - 15); + EXPECT_LT(diff, milliseconds + 15); +} + +TEST(ProcessThreadImpl, ProcessCallAfter5ms) { + ProcessCallAfterAFewMs(5); +} + +TEST(ProcessThreadImpl, ProcessCallAfter50ms) { + ProcessCallAfterAFewMs(50); +} + +TEST(ProcessThreadImpl, ProcessCallAfter200ms) { + ProcessCallAfterAFewMs(200); +} + +// Runs callbacks with the goal of getting up to 50 callbacks within a second +// (on average 1 callback every 20ms). On real hardware, we're usually pretty +// close to that, but the test bots that run on virtual machines, will +// typically be in the range 30-40 callbacks. +TEST(ProcessThreadImpl, MANUAL_Process50Times) { + ProcessThreadImpl thread; + ASSERT_EQ(0, thread.Start()); + + rtc::scoped_ptr event(EventWrapper::Create()); + + MockModule module; + int callback_count = 0; + // Ask for a callback after 20ms. + EXPECT_CALL(module, TimeUntilNextProcess()) + .WillRepeatedly(Return(20)); + EXPECT_CALL(module, Process()) + .WillRepeatedly(DoAll(Increment(&callback_count), + Return(0))); + + EXPECT_EQ(0, thread.RegisterModule(&module)); + + EXPECT_EQ(kEventTimeout, event->Wait(1000)); + ASSERT_EQ(0, thread.Stop()); + + printf("Callback count: %i\n", callback_count); + // Check that we got called back up to 50 times. + // Some of the try bots run on slow virtual machines, so the lower bound + // is much more relaxed to avoid flakiness. + EXPECT_GE(callback_count, 25); + EXPECT_LE(callback_count, 50); +} + +// Tests that we can wake up the worker thread to give us a callback right +// away when we know the thread is sleeping. +TEST(ProcessThreadImpl, WakeUp) { + ProcessThreadImpl thread; + ASSERT_EQ(0, thread.Start()); + + rtc::scoped_ptr started(EventWrapper::Create()); + rtc::scoped_ptr called(EventWrapper::Create()); + + MockModule module; + int64_t start_time = 0; + int64_t called_time = 0; + // Ask for a callback after 1000ms first, then 0ms. + EXPECT_CALL(module, TimeUntilNextProcess()) + .WillOnce(DoAll(SetTimestamp(&start_time), + SetEvent(started.get()), + Return(1000))) + .WillRepeatedly(Return(0)); + EXPECT_CALL(module, Process()) + .WillOnce(DoAll(SetTimestamp(&called_time), + SetEvent(called.get()), + Return(0))) + .WillRepeatedly(Return(0)); + + EXPECT_EQ(0, thread.RegisterModule(&module)); + + EXPECT_EQ(kEventSignaled, started->Wait(100)); + thread.WakeUp(&module); + EXPECT_EQ(kEventSignaled, called->Wait(100)); + ASSERT_EQ(0, thread.Stop()); + + ASSERT_GT(start_time, 0); + ASSERT_GT(called_time, 0); + EXPECT_GE(called_time, start_time); + uint32 diff = called_time - start_time; + // We should have been called back much quicker than 1sec. + EXPECT_LE(diff, 100u); +} + +} // namespace webrtc diff --git a/webrtc/modules/video_capture/test/video_capture_unittest.cc b/webrtc/modules/video_capture/test/video_capture_unittest.cc index 8635c9c44..2909fe2d6 100644 --- a/webrtc/modules/video_capture/test/video_capture_unittest.cc +++ b/webrtc/modules/video_capture/test/video_capture_unittest.cc @@ -436,7 +436,7 @@ class VideoCaptureExternalTest : public testing::Test { public: void SetUp() { capture_module_ = VideoCaptureFactory::Create(0, capture_input_interface_); - process_module_ = webrtc::ProcessThread::CreateProcessThread(); + process_module_ = webrtc::ProcessThread::Create(); process_module_->Start(); process_module_->RegisterModule(capture_module_); @@ -464,12 +464,11 @@ class VideoCaptureExternalTest : public testing::Test { void TearDown() { process_module_->Stop(); - webrtc::ProcessThread::DestroyProcessThread(process_module_); } webrtc::VideoCaptureExternal* capture_input_interface_; webrtc::scoped_refptr capture_module_; - webrtc::ProcessThread* process_module_; + rtc::scoped_ptr process_module_; webrtc::I420VideoFrame test_frame_; TestVideoCaptureCallback capture_callback_; TestVideoCaptureFeedBack capture_feedback_; diff --git a/webrtc/video_engine/vie_shared_data.cc b/webrtc/video_engine/vie_shared_data.cc index 11f136ba5..bfb579b09 100644 --- a/webrtc/video_engine/vie_shared_data.cc +++ b/webrtc/video_engine/vie_shared_data.cc @@ -24,11 +24,11 @@ ViESharedData::ViESharedData(const Config& config) channel_manager_(new ViEChannelManager(0, number_cores_, config)), input_manager_(new ViEInputManager(0, config)), render_manager_(new ViERenderManager(0)), - module_process_thread_(ProcessThread::CreateProcessThread()), + module_process_thread_(ProcessThread::Create()), last_error_(0) { Trace::CreateTrace(); - channel_manager_->SetModuleProcessThread(module_process_thread_); - input_manager_->SetModuleProcessThread(module_process_thread_); + channel_manager_->SetModuleProcessThread(module_process_thread_.get()); + input_manager_->SetModuleProcessThread(module_process_thread_.get()); module_process_thread_->Start(); } @@ -39,7 +39,6 @@ ViESharedData::~ViESharedData() { render_manager_.reset(); module_process_thread_->Stop(); - ProcessThread::DestroyProcessThread(module_process_thread_); Trace::ReturnTrace(); } diff --git a/webrtc/video_engine/vie_shared_data.h b/webrtc/video_engine/vie_shared_data.h index 0d8d100af..43b41d04f 100644 --- a/webrtc/video_engine/vie_shared_data.h +++ b/webrtc/video_engine/vie_shared_data.h @@ -16,7 +16,7 @@ #include -#include "webrtc/system_wrappers/interface/scoped_ptr.h" +#include "webrtc/base/scoped_ptr.h" namespace webrtc { @@ -48,10 +48,10 @@ class ViESharedData { private: const int number_cores_; - scoped_ptr channel_manager_; - scoped_ptr input_manager_; - scoped_ptr render_manager_; - ProcessThread* module_process_thread_; + rtc::scoped_ptr channel_manager_; + rtc::scoped_ptr input_manager_; + rtc::scoped_ptr render_manager_; + rtc::scoped_ptr module_process_thread_; mutable int last_error_; std::map overuse_observers_; diff --git a/webrtc/voice_engine/shared_data.cc b/webrtc/voice_engine/shared_data.cc index 218784474..ad00e038f 100644 --- a/webrtc/voice_engine/shared_data.cc +++ b/webrtc/voice_engine/shared_data.cc @@ -29,7 +29,7 @@ SharedData::SharedData(const Config& config) : _channelManager(_gInstanceCounter, config), _engineStatistics(_gInstanceCounter), _audioDevicePtr(NULL), - _moduleProcessThreadPtr(ProcessThread::CreateProcessThread()), + _moduleProcessThreadPtr(ProcessThread::Create()), _externalRecording(false), _externalPlayout(false) { @@ -55,7 +55,7 @@ SharedData::~SharedData() _audioDevicePtr->Release(); } delete _apiCritPtr; - ProcessThread::DestroyProcessThread(_moduleProcessThreadPtr); + _moduleProcessThreadPtr->Stop(); Trace::ReturnTrace(); } diff --git a/webrtc/voice_engine/shared_data.h b/webrtc/voice_engine/shared_data.h index dd76e96ab..3ab1d45ac 100644 --- a/webrtc/voice_engine/shared_data.h +++ b/webrtc/voice_engine/shared_data.h @@ -11,10 +11,10 @@ #ifndef WEBRTC_VOICE_ENGINE_SHARED_DATA_H #define WEBRTC_VOICE_ENGINE_SHARED_DATA_H +#include "webrtc/base/scoped_ptr.h" #include "webrtc/modules/audio_device/include/audio_device.h" #include "webrtc/modules/audio_processing/include/audio_processing.h" #include "webrtc/modules/utility/interface/process_thread.h" -#include "webrtc/system_wrappers/interface/scoped_ptr.h" #include "webrtc/voice_engine/channel_manager.h" #include "webrtc/voice_engine/statistics.h" #include "webrtc/voice_engine/voice_engine_defines.h" @@ -48,7 +48,7 @@ public: void set_ext_recording(bool value) { _externalRecording = value; } bool ext_playout() const { return _externalPlayout; } void set_ext_playout(bool value) { _externalPlayout = value; } - ProcessThread* process_thread() { return _moduleProcessThreadPtr; } + ProcessThread* process_thread() { return _moduleProcessThreadPtr.get(); } AudioDeviceModule::AudioLayer audio_device_layer() const { return _audioDeviceLayer; } @@ -73,8 +73,8 @@ protected: AudioDeviceModule* _audioDevicePtr; OutputMixer* _outputMixerPtr; TransmitMixer* _transmitMixerPtr; - scoped_ptr audioproc_; - ProcessThread* _moduleProcessThreadPtr; + rtc::scoped_ptr audioproc_; + rtc::scoped_ptr _moduleProcessThreadPtr; bool _externalRecording; bool _externalPlayout;