Adding basic support for posting tasks to a process thread.

BUG=
R=magjed@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/41099004

Cr-Commit-Position: refs/heads/master@{#8614}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8614 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
tommi@webrtc.org 2015-03-05 13:13:42 +00:00
parent 658d2015f3
commit 03054486f5
5 changed files with 74 additions and 1 deletions

View File

@ -22,8 +22,16 @@ class MockProcessThread : public ProcessThread {
MOCK_METHOD0(Start, void());
MOCK_METHOD0(Stop, void());
MOCK_METHOD1(WakeUp, void(Module* module));
MOCK_METHOD1(PostTask, void(ProcessTask* task));
MOCK_METHOD1(RegisterModule, void(Module* module));
MOCK_METHOD1(DeRegisterModule, void(Module* module));
// MOCK_METHOD1 gets confused with mocking this method, so we work around it
// by overriding the method from the interface and forwarding the call to a
// mocked, simpler method.
void PostTask(rtc::scoped_ptr<ProcessTask> task) override {
PostTask(task.get());
}
};
} // namespace webrtc

View File

@ -17,6 +17,14 @@
namespace webrtc {
class Module;
class ProcessTask {
public:
ProcessTask() {}
virtual ~ProcessTask() {}
virtual void Run() = 0;
};
class ProcessThread {
public:
virtual ~ProcessThread();
@ -36,6 +44,14 @@ class ProcessThread {
// Can be called on any thread.
virtual void WakeUp(Module* module) = 0;
// Queues a task object to run on the worker thread. Ownership of the
// task object is transferred to the ProcessThread and the object will
// either be deleted after running on the worker thread, or on the
// construction thread of the ProcessThread instance, if the task did not
// get a chance to run (e.g. posting the task while shutting down or when
// the thread never runs).
virtual void PostTask(rtc::scoped_ptr<ProcessTask> task) = 0;
// Adds a module that will start to receive callbacks on the worker thread.
// Can be called from any thread.
virtual void RegisterModule(Module* module) = 0;

View File

@ -51,6 +51,11 @@ ProcessThreadImpl::~ProcessThreadImpl() {
DCHECK(thread_checker_.CalledOnValidThread());
DCHECK(!thread_.get());
DCHECK(!stop_);
while (!queue_.empty()) {
delete queue_.front();
queue_.pop();
}
}
void ProcessThreadImpl::Start() {
@ -102,6 +107,15 @@ void ProcessThreadImpl::WakeUp(Module* module) {
wake_up_->Set();
}
void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
// Allowed to be called on any thread.
{
rtc::CritScope lock(&lock_);
queue_.push(task.release());
}
wake_up_->Set();
}
void ProcessThreadImpl::RegisterModule(Module* module) {
// Allowed to be called on any thread.
DCHECK(module);
@ -155,6 +169,7 @@ bool ProcessThreadImpl::Run(void* obj) {
bool ProcessThreadImpl::Process() {
int64_t now = TickTime::MillisecondTimestamp();
int64_t next_checkpoint = now + (1000 * 60);
{
rtc::CritScope lock(&lock_);
if (stop_)
@ -180,6 +195,15 @@ bool ProcessThreadImpl::Process() {
if (m.next_callback < next_checkpoint)
next_checkpoint = m.next_callback;
}
while (!queue_.empty()) {
ProcessTask* task = queue_.front();
queue_.pop();
lock_.Leave();
task->Run();
delete task;
lock_.Enter();
}
}
int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();

View File

@ -12,6 +12,7 @@
#define WEBRTC_MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
#include <list>
#include <queue>
#include "webrtc/base/criticalsection.h"
#include "webrtc/base/thread_checker.h"
@ -31,6 +32,7 @@ class ProcessThreadImpl : public ProcessThread {
void Stop() override;
void WakeUp(Module* module) override;
void PostTask(rtc::scoped_ptr<ProcessTask> task) override;
void RegisterModule(Module* module) override;
void DeRegisterModule(Module* module) override;
@ -64,13 +66,15 @@ class ProcessThreadImpl : public ProcessThread {
// issues, but I haven't figured out what they are, if there are alignment
// requirements for mutexes on Mac or if there's something else to it.
// So be careful with changing the layout.
rtc::CriticalSection lock_; // Used to guard modules_ and stop_.
rtc::CriticalSection lock_; // Used to guard modules_, tasks_ and stop_.
rtc::ThreadChecker thread_checker_;
const rtc::scoped_ptr<EventWrapper> wake_up_;
rtc::scoped_ptr<ThreadWrapper> thread_;
ModuleList modules_;
// TODO(tommi): Support delayed tasks.
std::queue<ProcessTask*> queue_;
bool stop_;
};

View File

@ -30,6 +30,15 @@ class MockModule : public Module {
MOCK_METHOD1(ProcessThreadAttached, void(ProcessThread*));
};
class RaiseEventTask : public ProcessTask {
public:
RaiseEventTask(EventWrapper* event) : event_(event) {}
void Run() override { event_->Set(); }
private:
EventWrapper* event_;
};
ACTION_P(SetEvent, event) {
event->Set();
}
@ -280,4 +289,16 @@ TEST(ProcessThreadImpl, WakeUp) {
EXPECT_LE(diff, 100u);
}
// Tests that we can post a task that gets run straight away on the worker
// thread.
TEST(ProcessThreadImpl, PostTask) {
ProcessThreadImpl thread;
rtc::scoped_ptr<EventWrapper> task_ran(EventWrapper::Create());
rtc::scoped_ptr<RaiseEventTask> task(new RaiseEventTask(task_ran.get()));
thread.Start();
thread.PostTask(task.Pass());
EXPECT_EQ(kEventSignaled, task_ran->Wait(100));
thread.Stop();
}
} // namespace webrtc