ProcessThread improvements.

* Added a way to notify a Module that it's been attached to a ProcessThread.
  The benefit of this is to give the module a way to wake up the thread
  when it needs work to happen on the worker thread, immediately.
  Today, module instances are typically registered with a process thread
  outside the control of the modules themselves.  I.e. they typically
  don't know about the process thread they're attached to.

* Improve ProcessThread's WakeUp algorithm to not call TimeUntilNextProcess
  when a WakeUp call is requested.  This is an optimization for the above
  case which avoids the module having to acquire a lock or do an interlocked
  operation before calling WakeUp(), which would ensure the module's
  TimeUntilNextProcess() implementation would return 0.

BUG=2822
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/39239004

Cr-Commit-Position: refs/heads/master@{#8527}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8527 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
tommi@webrtc.org 2015-02-27 13:36:34 +00:00
parent f296859c83
commit 3985f0151a
11 changed files with 160 additions and 137 deletions

View File

@ -15,6 +15,8 @@
namespace webrtc {
class ProcessThread;
class Module {
public:
// Returns the number of milliseconds until the module wants a worker
@ -32,6 +34,27 @@ class Module {
// Called on a worker thread.
virtual int32_t Process() = 0;
// This method is called when the module is attached to a *running* process
// thread or detached from one. In the case of detaching, |process_thread|
// will be nullptr.
//
// This method will be called in the following cases:
//
// * Non-null process_thread:
// * ProcessThread::RegisterModule() is called while the thread is running.
// * ProcessThread::Start() is called and RegisterModule has previously
// been called. The thread will be started immediately after notifying
// all modules.
//
// * Null process_thread:
// * ProcessThread::DeRegisterModule() is called while the thread is
// running.
// * ProcessThread::Stop() was called and the thread has been stopped.
//
// NOTE: This method is not called from the worker thread itself, but from
// the thread that registers/deregisters the module or calls Start/Stop.
virtual void ProcessThreadAttached(ProcessThread* process_thread) {}
protected:
virtual ~Module() {}
};

View File

@ -19,11 +19,11 @@ namespace webrtc {
class MockProcessThread : public ProcessThread {
public:
MOCK_METHOD0(Start, int32_t());
MOCK_METHOD0(Stop, int32_t());
MOCK_METHOD0(Start, void());
MOCK_METHOD0(Stop, void());
MOCK_METHOD1(WakeUp, void(Module* module));
MOCK_METHOD1(RegisterModule, int32_t(Module* module));
MOCK_METHOD1(DeRegisterModule, int32_t(const Module* module));
MOCK_METHOD1(RegisterModule, void(Module* module));
MOCK_METHOD1(DeRegisterModule, void(Module* module));
};
} // namespace webrtc

View File

@ -24,10 +24,10 @@ class ProcessThread {
static rtc::scoped_ptr<ProcessThread> Create();
// Starts the worker thread. Must be called from the construction thread.
virtual int32_t Start() = 0;
virtual void Start() = 0;
// Stops the worker thread. Must be called from the construction thread.
virtual int32_t Stop() = 0;
virtual void Stop() = 0;
// Wakes the thread up to give a module a chance to do processing right
// away. This causes the worker thread to wake up and requery the specified
@ -38,11 +38,11 @@ class ProcessThread {
// Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread.
virtual int32_t RegisterModule(Module* module) = 0;
virtual void RegisterModule(Module* module) = 0;
// Removes a previously registered module.
// Can be called from any thread.
virtual int32_t DeRegisterModule(const Module* module) = 0;
virtual void DeRegisterModule(Module* module) = 0;
};
} // namespace webrtc

View File

@ -17,6 +17,12 @@
namespace webrtc {
namespace {
// We use this constant internally to signal that a module has requested
// a callback right away. When this is set, no call to TimeUntilNextProcess
// should be made, but Process() should be called directly.
const int64_t kCallProcessImmediately = -1;
int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
int64_t interval = module->TimeUntilNextProcess();
// Currently some implementations erroneously return error codes from
@ -47,28 +53,27 @@ ProcessThreadImpl::~ProcessThreadImpl() {
DCHECK(!stop_);
}
int32_t ProcessThreadImpl::Start() {
void ProcessThreadImpl::Start() {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(!thread_.get());
if (thread_.get())
return -1;
return;
DCHECK(!stop_);
for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(this);
thread_.reset(ThreadWrapper::CreateThread(
&ProcessThreadImpl::Run, this, kNormalPriority, "ProcessThread"));
unsigned int id;
if (!thread_->Start(id)) {
thread_.reset();
return -1;
}
return 0;
CHECK(thread_->Start(id));
}
int32_t ProcessThreadImpl::Stop() {
void ProcessThreadImpl::Stop() {
DCHECK(thread_checker_.CalledOnValidThread());
if(!thread_.get())
return 0;
return;
{
rtc::CritScope lock(&lock_);
@ -77,11 +82,12 @@ int32_t ProcessThreadImpl::Stop() {
wake_up_->Set();
thread_->Stop();
CHECK(thread_->Stop());
thread_.reset();
stop_ = false;
return 0;
for (ModuleCallback& m : modules_)
m.module->ProcessThreadAttached(nullptr);
}
void ProcessThreadImpl::WakeUp(Module* module) {
@ -90,23 +96,33 @@ void ProcessThreadImpl::WakeUp(Module* module) {
rtc::CritScope lock(&lock_);
for (ModuleCallback& m : modules_) {
if (m.module == module)
m.next_callback = 0;
m.next_callback = kCallProcessImmediately;
}
}
wake_up_->Set();
}
int32_t ProcessThreadImpl::RegisterModule(Module* module) {
void ProcessThreadImpl::RegisterModule(Module* module) {
// Allowed to be called on any thread.
DCHECK(module);
#if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
{
// Catch programmer error.
rtc::CritScope lock(&lock_);
for (const ModuleCallback& mc : modules_)
DCHECK(mc.module != module);
}
#endif
// Now that we know the module isn't in the list, we'll call out to notify
// the module that it's attached to the worker thread. We don't hold
// the lock while we make this call.
if (thread_.get())
module->ProcessThreadAttached(this);
{
rtc::CritScope lock(&lock_);
// Only allow module to be registered once.
for (const ModuleCallback& mc : modules_) {
if (mc.module == module)
return -1;
}
modules_.push_back(ModuleCallback(module));
}
@ -114,18 +130,21 @@ int32_t ProcessThreadImpl::RegisterModule(Module* module) {
// waiting time. The waiting time for the just registered module may be
// shorter than all other registered modules.
wake_up_->Set();
return 0;
}
int32_t ProcessThreadImpl::DeRegisterModule(const Module* module) {
void ProcessThreadImpl::DeRegisterModule(Module* module) {
// Allowed to be called on any thread.
DCHECK(module);
rtc::CritScope lock(&lock_);
modules_.remove_if([&module](const ModuleCallback& m) {
return m.module == module;
});
return 0;
{
rtc::CritScope lock(&lock_);
modules_.remove_if([&module](const ModuleCallback& m) {
return m.module == module;
});
}
// Notify the module that it's been detached, while not holding the lock.
if (thread_.get())
module->ProcessThreadAttached(nullptr);
}
// static
@ -148,7 +167,8 @@ bool ProcessThreadImpl::Process() {
if (m.next_callback == 0)
m.next_callback = GetNextCallbackTime(m.module, now);
if (m.next_callback <= now) {
if (m.next_callback <= now ||
m.next_callback == kCallProcessImmediately) {
m.module->Process();
// Use a new 'now' reference to calculate when the next callback
// should occur. We'll continue to use 'now' above for the baseline

View File

@ -27,13 +27,13 @@ class ProcessThreadImpl : public ProcessThread {
ProcessThreadImpl();
~ProcessThreadImpl() override;
int32_t Start() override;
int32_t Stop() override;
void Start() override;
void Stop() override;
void WakeUp(Module* module) override;
int32_t RegisterModule(Module* module) override;
int32_t DeRegisterModule(const Module* module) override;
void RegisterModule(Module* module) override;
void DeRegisterModule(Module* module) override;
protected:
static bool Run(void* obj);

View File

@ -27,6 +27,7 @@ class MockModule : public Module {
public:
MOCK_METHOD0(TimeUntilNextProcess, int64_t());
MOCK_METHOD0(Process, int32_t());
MOCK_METHOD1(ProcessThreadAttached, void(ProcessThread*));
};
ACTION_P(SetEvent, event) {
@ -43,22 +44,22 @@ ACTION_P(SetTimestamp, ptr) {
TEST(ProcessThreadImpl, StartStop) {
ProcessThreadImpl thread;
EXPECT_EQ(0, thread.Start());
EXPECT_EQ(0, thread.Stop());
thread.Start();
thread.Stop();
}
TEST(ProcessThreadImpl, MultipleStartStop) {
ProcessThreadImpl thread;
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(0, thread.Start());
EXPECT_EQ(0, thread.Stop());
thread.Start();
thread.Stop();
}
}
// Verifies that we get at least call back to Process() on the worker thread.
TEST(ProcessThreadImpl, ProcessCall) {
ProcessThreadImpl thread;
ASSERT_EQ(0, thread.Start());
thread.Start();
rtc::scoped_ptr<EventWrapper> event(EventWrapper::Create());
@ -67,10 +68,13 @@ TEST(ProcessThreadImpl, ProcessCall) {
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetEvent(event.get()), Return(0)))
.WillRepeatedly(Return(0));
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
ASSERT_EQ(0, thread.RegisterModule(&module));
thread.RegisterModule(&module);
EXPECT_EQ(kEventSignaled, event->Wait(100));
EXPECT_EQ(0, thread.Stop());
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
}
// Same as ProcessCall except the module is registered before the
@ -85,10 +89,14 @@ TEST(ProcessThreadImpl, ProcessCall2) {
.WillOnce(DoAll(SetEvent(event.get()), Return(0)))
.WillRepeatedly(Return(0));
ASSERT_EQ(0, thread.RegisterModule(&module));
ASSERT_EQ(thread.Start(), 0);
thread.RegisterModule(&module);
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.Start();
EXPECT_EQ(kEventSignaled, event->Wait(100));
EXPECT_EQ(thread.Stop(), 0);
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
}
// Tests setting up a module for callbacks and then unregister that module.
@ -106,18 +114,23 @@ TEST(ProcessThreadImpl, Deregister) {
Return(0)))
.WillRepeatedly(DoAll(Increment(&process_count), Return(0)));
ASSERT_EQ(0, thread.RegisterModule(&module));
ASSERT_EQ(0, thread.Start());
thread.RegisterModule(&module);
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.Start();
EXPECT_EQ(kEventSignaled, event->Wait(100));
ASSERT_EQ(0, thread.DeRegisterModule(&module));
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.DeRegisterModule(&module);
EXPECT_GE(process_count, 1);
int count_after_deregister = process_count;
// We shouldn't get any more callbacks.
EXPECT_EQ(kEventTimeout, event->Wait(20));
EXPECT_EQ(count_after_deregister, process_count);
EXPECT_EQ(0, thread.Stop());
thread.Stop();
}
// Helper function for testing receiving a callback after a certain amount of
@ -125,7 +138,7 @@ TEST(ProcessThreadImpl, Deregister) {
// flakiness on bots.
void ProcessCallAfterAFewMs(int64_t milliseconds) {
ProcessThreadImpl thread;
ASSERT_EQ(0, thread.Start());
thread.Start();
rtc::scoped_ptr<EventWrapper> event(EventWrapper::Create());
@ -142,12 +155,15 @@ void ProcessCallAfterAFewMs(int64_t milliseconds) {
Return(0)))
.WillRepeatedly(Return(0));
EXPECT_EQ(0, thread.RegisterModule(&module));
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module);
// Add a buffer of 50ms due to slowness of some trybots
// (e.g. win_drmemory_light)
EXPECT_EQ(kEventSignaled, event->Wait(milliseconds + 50));
ASSERT_EQ(0, thread.Stop());
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
ASSERT_GT(start_time, 0);
ASSERT_GT(called_time, 0);
@ -187,7 +203,7 @@ TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter200ms) {
// TODO(tommi): Fix.
TEST(ProcessThreadImpl, DISABLED_Process50Times) {
ProcessThreadImpl thread;
ASSERT_EQ(0, thread.Start());
thread.Start();
rtc::scoped_ptr<EventWrapper> event(EventWrapper::Create());
@ -200,10 +216,13 @@ TEST(ProcessThreadImpl, DISABLED_Process50Times) {
.WillRepeatedly(DoAll(Increment(&callback_count),
Return(0)));
EXPECT_EQ(0, thread.RegisterModule(&module));
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module);
EXPECT_EQ(kEventTimeout, event->Wait(1000));
ASSERT_EQ(0, thread.Stop());
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
printf("Callback count: %i\n", callback_count);
// Check that we got called back up to 50 times.
@ -217,7 +236,7 @@ TEST(ProcessThreadImpl, DISABLED_Process50Times) {
// away when we know the thread is sleeping.
TEST(ProcessThreadImpl, WakeUp) {
ProcessThreadImpl thread;
ASSERT_EQ(0, thread.Start());
thread.Start();
rtc::scoped_ptr<EventWrapper> started(EventWrapper::Create());
rtc::scoped_ptr<EventWrapper> called(EventWrapper::Create());
@ -225,24 +244,33 @@ TEST(ProcessThreadImpl, WakeUp) {
MockModule module;
int64_t start_time = 0;
int64_t called_time = 0;
// Ask for a callback after 1000ms first, then 0ms.
// Ask for a callback after 1000ms.
// TimeUntilNextProcess will be called twice.
// The first time we use it to get the thread into a waiting state.
// Then we wake the thread and there should not be another call made to
// TimeUntilNextProcess before Process() is called.
// The second time TimeUntilNextProcess is then called, is after Process
// has been called and we don't expect any more calls.
EXPECT_CALL(module, TimeUntilNextProcess())
.WillOnce(DoAll(SetTimestamp(&start_time),
SetEvent(started.get()),
Return(1000)))
.WillRepeatedly(Return(0));
.WillOnce(Return(1000));
EXPECT_CALL(module, Process())
.WillOnce(DoAll(SetTimestamp(&called_time),
SetEvent(called.get()),
Return(0)))
.WillRepeatedly(Return(0));
EXPECT_EQ(0, thread.RegisterModule(&module));
EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
thread.RegisterModule(&module);
EXPECT_EQ(kEventSignaled, started->Wait(100));
thread.WakeUp(&module);
EXPECT_EQ(kEventSignaled, called->Wait(100));
ASSERT_EQ(0, thread.Stop());
EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
thread.Stop();
ASSERT_GT(start_time, 0);
ASSERT_GT(called_time, 0);

View File

@ -150,10 +150,7 @@ int32_t ViECapturer::Init(VideoCaptureModule* capture_module) {
capture_module_ = capture_module;
capture_module_->RegisterCaptureDataCallback(*this);
capture_module_->AddRef();
if (module_process_thread_.RegisterModule(capture_module_) != 0) {
return -1;
}
module_process_thread_.RegisterModule(capture_module_);
return 0;
}
@ -189,9 +186,7 @@ int32_t ViECapturer::Init(const char* device_unique_idUTF8,
}
capture_module_->AddRef();
capture_module_->RegisterCaptureDataCallback(*this);
if (module_process_thread_.RegisterModule(capture_module_) != 0) {
return -1;
}
module_process_thread_.RegisterModule(capture_module_);
return 0;
}

View File

@ -141,15 +141,12 @@ ViEChannel::ViEChannel(int32_t channel_id,
}
int32_t ViEChannel::Init() {
if (module_process_thread_.RegisterModule(
vie_receiver_.GetReceiveStatistics()) != 0) {
return -1;
}
module_process_thread_.RegisterModule(vie_receiver_.GetReceiveStatistics());
// RTP/RTCP initialization.
rtp_rtcp_->SetSendingMediaStatus(false);
if (module_process_thread_.RegisterModule(rtp_rtcp_.get()) != 0) {
return -1;
}
module_process_thread_.RegisterModule(rtp_rtcp_.get());
rtp_rtcp_->SetKeyFrameRequestMethod(kKeyFrameReqFirRtp);
rtp_rtcp_->SetRTCPStatus(kRtcpCompound);
if (paced_sender_) {
@ -173,9 +170,10 @@ int32_t ViEChannel::Init() {
vcm_->RegisterReceiveStatisticsCallback(this);
vcm_->RegisterDecoderTimingCallback(this);
vcm_->SetRenderDelay(kViEDefaultRenderDelayMs);
if (module_process_thread_.RegisterModule(vcm_) != 0) {
return -1;
}
module_process_thread_.RegisterModule(vcm_);
module_process_thread_.RegisterModule(&vie_sync_);
#ifdef VIDEOCODEC_VP8
if (!disable_default_encoder_) {
VideoCodec video_codec;
@ -1835,13 +1833,7 @@ int32_t ViEChannel::StopDecodeThread() {
}
int32_t ViEChannel::SetVoiceChannel(int32_t ve_channel_id,
VoEVideoSync* ve_sync_interface) {
if (ve_sync_interface) {
// Register lip sync
module_process_thread_.RegisterModule(&vie_sync_);
} else {
module_process_thread_.DeRegisterModule(&vie_sync_);
}
VoEVideoSync* ve_sync_interface) {
return vie_sync_.ConfigureSync(ve_channel_id,
ve_sync_interface,
rtp_rtcp_.get(),

View File

@ -914,12 +914,8 @@ Channel::~Channel()
" (Audio coding module)");
}
// De-register modules in process thread
if (_moduleProcessThreadPtr->DeRegisterModule(_rtpRtcpModule.get()) == -1)
{
WEBRTC_TRACE(kTraceInfo, kTraceVoice,
VoEId(_instanceId,_channelId),
"~Channel() failed to deregister RTP/RTCP module");
}
_moduleProcessThreadPtr->DeRegisterModule(_rtpRtcpModule.get());
// End of modules shutdown
// Delete other objects
@ -955,16 +951,8 @@ Channel::Init()
// --- Add modules to process thread (for periodic schedulation)
const bool processThreadFail =
((_moduleProcessThreadPtr->RegisterModule(_rtpRtcpModule.get()) != 0) ||
false);
if (processThreadFail)
{
_engineStatisticsPtr->SetLastError(
VE_CANNOT_INIT_CHANNEL, kTraceError,
"Channel::Init() modules not registered");
return -1;
}
_moduleProcessThreadPtr->RegisterModule(_rtpRtcpModule.get());
// --- ACM initialization
if ((audio_coding_->InitializeReceiver() == -1) ||

View File

@ -259,15 +259,8 @@ TransmitMixer::SetEngineInformation(ProcessThread& processThread,
_engineStatisticsPtr = &engineStatistics;
_channelManagerPtr = &channelManager;
if (_processThreadPtr->RegisterModule(&_monitorModule) == -1)
{
WEBRTC_TRACE(kTraceWarning, kTraceVoice, VoEId(_instanceId, -1),
"TransmitMixer::SetEngineInformation() failed to"
"register the monitor module");
} else
{
_monitorModule.RegisterObserver(*this);
}
_processThreadPtr->RegisterModule(&_monitorModule);
_monitorModule.RegisterObserver(*this);
return 0;
}

View File

@ -314,12 +314,7 @@ int VoEBaseImpl::Init(AudioDeviceModule* external_adm,
if (_shared->process_thread())
{
if (_shared->process_thread()->Start() != 0)
{
_shared->SetLastError(VE_THREAD_ERROR, kTraceError,
"Init() failed to start module process thread");
return -1;
}
_shared->process_thread()->Start();
}
// Create an internal ADM if the user has not added an external
@ -347,12 +342,9 @@ int VoEBaseImpl::Init(AudioDeviceModule* external_adm,
// Register the ADM to the process thread, which will drive the error
// callback mechanism
if (_shared->process_thread() &&
_shared->process_thread()->RegisterModule(_shared->audio_device()) != 0)
if (_shared->process_thread())
{
_shared->SetLastError(VE_AUDIO_DEVICE_MODULE_ERROR, kTraceError,
"Init() failed to register the ADM");
return -1;
_shared->process_thread()->RegisterModule(_shared->audio_device());
}
bool available(false);
@ -945,18 +937,10 @@ int32_t VoEBaseImpl::TerminateInternal()
{
if (_shared->audio_device())
{
if (_shared->process_thread()->
DeRegisterModule(_shared->audio_device()) != 0)
{
_shared->SetLastError(VE_THREAD_ERROR, kTraceError,
"TerminateInternal() failed to deregister ADM");
}
}
if (_shared->process_thread()->Stop() != 0)
{
_shared->SetLastError(VE_THREAD_ERROR, kTraceError,
"TerminateInternal() failed to stop module process thread");
_shared->process_thread()->DeRegisterModule(
_shared->audio_device());
}
_shared->process_thread()->Stop();
}
if (_shared->audio_device())