Remove EventWrapper::Reset().

This simplifies the event wrapper which we've recently found issues in.
Also refactoring EndToEndTest.RespectsNetworkState to not depend on it.

BUG=
R=stefan@webrtc.org, tommi@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/41939004

Cr-Commit-Position: refs/heads/master@{#8366}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8366 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
pbos@webrtc.org 2015-02-13 14:58:18 +00:00
parent 5a7dc39277
commit d5ce2e63df
14 changed files with 79 additions and 102 deletions

View File

@ -214,7 +214,6 @@ int32_t AudioTrackJni::Terminate() {
// If we close thread anyway, the app will crash // If we close thread anyway, the app will crash
return -1; return -1;
} }
_playStartStopEvent.Reset();
_critSect.Enter(); _critSect.Enter();
// Close down play thread // Close down play thread
@ -521,7 +520,6 @@ int32_t AudioTrackJni::StartPlayout() {
WEBRTC_TRACE(kTraceError, kTraceAudioDevice, _id, WEBRTC_TRACE(kTraceError, kTraceAudioDevice, _id,
" Timeout or error starting"); " Timeout or error starting");
} }
_playStartStopEvent.Reset();
_critSect.Enter(); _critSect.Enter();
// Detach this thread if it was attached // Detach this thread if it was attached
@ -1284,7 +1282,6 @@ bool AudioTrackJni::PlayThreadProcess()
case kEventSignaled: case kEventSignaled:
WEBRTC_TRACE(kTraceDebug, kTraceAudioDevice, WEBRTC_TRACE(kTraceDebug, kTraceAudioDevice,
_id, "Playout thread event signal"); _id, "Playout thread event signal");
_timeEventPlay.Reset();
break; break;
case kEventError: case kEventError:
WEBRTC_TRACE(kTraceWarning, kTraceAudioDevice, WEBRTC_TRACE(kTraceWarning, kTraceAudioDevice,

View File

@ -2636,7 +2636,6 @@ bool AudioDeviceLinuxPulse::PlayThreadProcess()
switch (_timeEventPlay.Wait(1000)) switch (_timeEventPlay.Wait(1000))
{ {
case kEventSignaled: case kEventSignaled:
_timeEventPlay.Reset();
break; break;
case kEventError: case kEventError:
WEBRTC_TRACE(kTraceWarning, kTraceAudioDevice, _id, WEBRTC_TRACE(kTraceWarning, kTraceAudioDevice, _id,
@ -2878,7 +2877,6 @@ bool AudioDeviceLinuxPulse::RecThreadProcess()
switch (_timeEventRec.Wait(1000)) switch (_timeEventRec.Wait(1000))
{ {
case kEventSignaled: case kEventSignaled:
_timeEventRec.Reset();
break; break;
case kEventError: case kEventError:
WEBRTC_TRACE(kTraceWarning, kTraceAudioDevice, _id, WEBRTC_TRACE(kTraceWarning, kTraceAudioDevice, _id,

View File

@ -214,7 +214,6 @@ void VCMJitterBuffer::Start() {
time_first_packet_ms_ = 0; time_first_packet_ms_ = 0;
// Start in a non-signaled state. // Start in a non-signaled state.
frame_event_->Reset();
waiting_for_completion_.frame_size = 0; waiting_for_completion_.frame_size = 0;
waiting_for_completion_.timestamp = 0; waiting_for_completion_.timestamp = 0;
waiting_for_completion_.latest_packet_time = -1; waiting_for_completion_.latest_packet_time = -1;
@ -258,7 +257,6 @@ void VCMJitterBuffer::Flush() {
decodable_frames_.Reset(&free_frames_); decodable_frames_.Reset(&free_frames_);
incomplete_frames_.Reset(&free_frames_); incomplete_frames_.Reset(&free_frames_);
last_decoded_state_.Reset(); // TODO(mikhal): sync reset. last_decoded_state_.Reset(); // TODO(mikhal): sync reset.
frame_event_->Reset();
num_consecutive_old_packets_ = 0; num_consecutive_old_packets_ = 0;
// Also reset the jitter and delay estimates // Also reset the jitter and delay estimates
jitter_estimate_.Reset(); jitter_estimate_.Reset();
@ -409,10 +407,6 @@ bool VCMJitterBuffer::NextCompleteTimestamp(
break; break;
} }
} }
// Inside |crit_sect_|.
} else {
// We already have a frame, reset the event.
frame_event_->Reset();
} }
if (decodable_frames_.empty() || if (decodable_frames_.empty() ||
decodable_frames_.Front()->GetState() != kStateComplete) { decodable_frames_.Front()->GetState() != kStateComplete) {

View File

@ -49,7 +49,6 @@ void VCMReceiver::Reset() {
} else { } else {
jitter_buffer_.Flush(); jitter_buffer_.Flush();
} }
render_wait_event_->Reset();
state_ = kReceiving; state_ = kReceiving;
} }

View File

@ -35,9 +35,6 @@ class EventWrapper {
// either immediately or some time in the future. // either immediately or some time in the future.
virtual bool Set() = 0; virtual bool Set() = 0;
// Prevents future Wait() calls from finishing without a new Set() call.
virtual bool Reset() = 0;
// Puts the calling thread into a wait state. The thread may be released // Puts the calling thread into a wait state. The thread may be released
// by a Set() call depending on if other threads are waiting and if so on // by a Set() call depending on if other threads are waiting and if so on
// timing. The thread that was released will call Reset() before leaving // timing. The thread that was released will call Reset() before leaving

View File

@ -92,15 +92,6 @@ EventPosix::~EventPosix() {
pthread_mutex_destroy(&mutex_); pthread_mutex_destroy(&mutex_);
} }
bool EventPosix::Reset() {
if (0 != pthread_mutex_lock(&mutex_)) {
return false;
}
state_ = kDown;
pthread_mutex_unlock(&mutex_);
return true;
}
bool EventPosix::Set() { bool EventPosix::Set() {
if (0 != pthread_mutex_lock(&mutex_)) { if (0 != pthread_mutex_lock(&mutex_)) {
return false; return false;

View File

@ -33,7 +33,6 @@ class EventPosix : public EventWrapper {
virtual EventTypeWrapper Wait(unsigned long max_time) OVERRIDE; virtual EventTypeWrapper Wait(unsigned long max_time) OVERRIDE;
virtual bool Set() OVERRIDE; virtual bool Set() OVERRIDE;
virtual bool Reset() OVERRIDE;
virtual bool StartTimer(bool periodic, unsigned long time) OVERRIDE; virtual bool StartTimer(bool periodic, unsigned long time) OVERRIDE;
virtual bool StopTimer() OVERRIDE; virtual bool StopTimer() OVERRIDE;

View File

@ -32,10 +32,6 @@ bool EventWindows::Set() {
return SetEvent(event_) == 1; return SetEvent(event_) == 1;
} }
bool EventWindows::Reset() {
return ResetEvent(event_) == 1;
}
EventTypeWrapper EventWindows::Wait(unsigned long max_time) { EventTypeWrapper EventWindows::Wait(unsigned long max_time) {
unsigned long res = WaitForSingleObject(event_, max_time); unsigned long res = WaitForSingleObject(event_, max_time);
switch (res) { switch (res) {

View File

@ -26,7 +26,6 @@ class EventWindows : public EventWrapper {
virtual EventTypeWrapper Wait(unsigned long max_time); virtual EventTypeWrapper Wait(unsigned long max_time);
virtual bool Set(); virtual bool Set();
virtual bool Reset();
virtual bool StartTimer(bool periodic, unsigned long time); virtual bool StartTimer(bool periodic, unsigned long time);
virtual bool StopTimer(); virtual bool StopTimer();

View File

@ -82,7 +82,6 @@ bool DirectTransport::SendPackets() {
if (wait_time_ms > 0) { if (wait_time_ms > 0) {
switch (packet_event_->Wait(static_cast<unsigned long>(wait_time_ms))) { switch (packet_event_->Wait(static_cast<unsigned long>(wait_time_ms))) {
case kEventSignaled: case kEventSignaled:
packet_event_->Reset();
break; break;
case kEventTimeout: case kEventTimeout:
break; break;

View File

@ -47,7 +47,6 @@ class RtpRtcpObserver {
virtual EventTypeWrapper Wait() { virtual EventTypeWrapper Wait() {
EventTypeWrapper result = observation_complete_->Wait(timeout_ms_); EventTypeWrapper result = observation_complete_->Wait(timeout_ms_);
observation_complete_->Reset();
return result; return result;
} }

View File

@ -2120,40 +2120,24 @@ TEST_F(EndToEndTest, RespectsNetworkState) {
FakeEncoder(Clock::GetRealTimeClock()), FakeEncoder(Clock::GetRealTimeClock()),
test_crit_(CriticalSectionWrapper::CreateCriticalSection()), test_crit_(CriticalSectionWrapper::CreateCriticalSection()),
encoded_frames_(EventWrapper::Create()), encoded_frames_(EventWrapper::Create()),
sender_packets_(EventWrapper::Create()), packet_event_(EventWrapper::Create()),
receiver_packets_(EventWrapper::Create()),
sender_state_(Call::kNetworkUp), sender_state_(Call::kNetworkUp),
down_sender_rtp_(0), sender_rtp_(0),
down_sender_rtcp_(0), sender_rtcp_(0),
receiver_state_(Call::kNetworkUp), receiver_rtcp_(0),
down_receiver_rtcp_(0),
down_frames_(0) {} down_frames_(0) {}
virtual Action OnSendRtp(const uint8_t* packet, size_t length) override { virtual Action OnSendRtp(const uint8_t* packet, size_t length) override {
CriticalSectionScoped lock(test_crit_.get()); CriticalSectionScoped lock(test_crit_.get());
if (sender_state_ == Call::kNetworkDown) { ++sender_rtp_;
++down_sender_rtp_; packet_event_->Set();
EXPECT_LE(down_sender_rtp_, kNumAcceptedDowntimeRtp)
<< "RTP sent during sender-side downtime.";
if (down_sender_rtp_> kNumAcceptedDowntimeRtp)
sender_packets_->Set();
} else {
sender_packets_->Set();
}
return SEND_PACKET; return SEND_PACKET;
} }
virtual Action OnSendRtcp(const uint8_t* packet, size_t length) override { virtual Action OnSendRtcp(const uint8_t* packet, size_t length) override {
CriticalSectionScoped lock(test_crit_.get()); CriticalSectionScoped lock(test_crit_.get());
if (sender_state_ == Call::kNetworkDown) { ++sender_rtcp_;
++down_sender_rtcp_; packet_event_->Set();
EXPECT_LE(down_sender_rtcp_, kNumAcceptedDowntimeRtcp)
<< "RTCP sent during sender-side downtime.";
if (down_sender_rtcp_ > kNumAcceptedDowntimeRtcp)
sender_packets_->Set();
} else {
sender_packets_->Set();
}
return SEND_PACKET; return SEND_PACKET;
} }
@ -2165,15 +2149,8 @@ TEST_F(EndToEndTest, RespectsNetworkState) {
virtual Action OnReceiveRtcp(const uint8_t* packet, virtual Action OnReceiveRtcp(const uint8_t* packet,
size_t length) override { size_t length) override {
CriticalSectionScoped lock(test_crit_.get()); CriticalSectionScoped lock(test_crit_.get());
if (receiver_state_ == Call::kNetworkDown) { ++receiver_rtcp_;
++down_receiver_rtcp_; packet_event_->Set();
EXPECT_LE(down_receiver_rtcp_, kNumAcceptedDowntimeRtcp)
<< "RTCP sent during receiver-side downtime.";
if (down_receiver_rtcp_ > kNumAcceptedDowntimeRtcp)
receiver_packets_->Set();
} else {
receiver_packets_->Set();
}
return SEND_PACKET; return SEND_PACKET;
} }
@ -2193,45 +2170,33 @@ TEST_F(EndToEndTest, RespectsNetworkState) {
virtual void PerformTest() override { virtual void PerformTest() override {
EXPECT_EQ(kEventSignaled, encoded_frames_->Wait(kDefaultTimeoutMs)) EXPECT_EQ(kEventSignaled, encoded_frames_->Wait(kDefaultTimeoutMs))
<< "No frames received by the encoder."; << "No frames received by the encoder.";
EXPECT_EQ(kEventSignaled, sender_packets_->Wait(kDefaultTimeoutMs)) // Wait for packets from both sender/receiver.
<< "Timed out waiting for send-side packets."; WaitForPacketsOrSilence(false, false);
EXPECT_EQ(kEventSignaled, receiver_packets_->Wait(kDefaultTimeoutMs))
<< "Timed out waiting for receiver-side packets.";
// Sender-side network down. // Sender-side network down.
sender_call_->SignalNetworkState(Call::kNetworkDown); sender_call_->SignalNetworkState(Call::kNetworkDown);
{ {
CriticalSectionScoped lock(test_crit_.get()); CriticalSectionScoped lock(test_crit_.get());
sender_packets_->Reset(); // Earlier packets should not count. // After network goes down we shouldn't be encoding more frames.
sender_state_ = Call::kNetworkDown; sender_state_ = Call::kNetworkDown;
} }
EXPECT_EQ(kEventTimeout, sender_packets_->Wait(kSilenceTimeoutMs)) // Wait for receiver-packets and no sender packets.
<< "Packets sent during sender-network downtime."; WaitForPacketsOrSilence(true, false);
EXPECT_EQ(kEventSignaled, receiver_packets_->Wait(kDefaultTimeoutMs))
<< "Timed out waiting for receiver-side packets.";
// Receiver-side network down. // Receiver-side network down.
receiver_call_->SignalNetworkState(Call::kNetworkDown); receiver_call_->SignalNetworkState(Call::kNetworkDown);
{ WaitForPacketsOrSilence(true, true);
CriticalSectionScoped lock(test_crit_.get());
receiver_packets_->Reset(); // Earlier packets should not count.
receiver_state_ = Call::kNetworkDown;
}
EXPECT_EQ(kEventTimeout, receiver_packets_->Wait(kSilenceTimeoutMs))
<< "Packets sent during receiver-network downtime.";
// Network back up again for both. // Network back up again for both.
{ {
CriticalSectionScoped lock(test_crit_.get()); CriticalSectionScoped lock(test_crit_.get());
sender_packets_->Reset(); // Earlier packets should not count. // It's OK to encode frames again, as we're about to bring up the
receiver_packets_->Reset(); // Earlier packets should not count. // network.
sender_state_ = receiver_state_ = Call::kNetworkUp; sender_state_ = Call::kNetworkUp;
} }
sender_call_->SignalNetworkState(Call::kNetworkUp); sender_call_->SignalNetworkState(Call::kNetworkUp);
receiver_call_->SignalNetworkState(Call::kNetworkUp); receiver_call_->SignalNetworkState(Call::kNetworkUp);
EXPECT_EQ(kEventSignaled, sender_packets_->Wait(kDefaultTimeoutMs)) WaitForPacketsOrSilence(false, false);
<< "Timed out waiting for send-side packets.";
EXPECT_EQ(kEventSignaled, receiver_packets_->Wait(kDefaultTimeoutMs))
<< "Timed out waiting for receiver-side packets.";
} }
virtual int32_t Encode( virtual int32_t Encode(
@ -2255,17 +2220,61 @@ TEST_F(EndToEndTest, RespectsNetworkState) {
} }
private: private:
void WaitForPacketsOrSilence(bool sender_down, bool receiver_down) {
int64_t initial_time_ms = clock_->TimeInMilliseconds();
int initial_sender_rtp;
int initial_sender_rtcp;
int initial_receiver_rtcp;
{
CriticalSectionScoped lock(test_crit_.get());
initial_sender_rtp = sender_rtp_;
initial_sender_rtcp = sender_rtcp_;
initial_receiver_rtcp = receiver_rtcp_;
}
bool sender_done = false;
bool receiver_done = false;
while(!sender_done || !receiver_done) {
packet_event_->Wait(kSilenceTimeoutMs);
int64_t time_now_ms = clock_->TimeInMilliseconds();
CriticalSectionScoped lock(test_crit_.get());
if (sender_down) {
ASSERT_LE(sender_rtp_ - initial_sender_rtp, kNumAcceptedDowntimeRtp)
<< "RTP sent during sender-side downtime.";
ASSERT_LE(sender_rtcp_ - initial_sender_rtcp,
kNumAcceptedDowntimeRtcp)
<< "RTCP sent during sender-side downtime.";
if (time_now_ms - initial_time_ms >=
static_cast<int64_t>(kSilenceTimeoutMs)) {
sender_done = true;
}
} else {
if (sender_rtp_ > initial_sender_rtp)
sender_done = true;
}
if (receiver_down) {
ASSERT_LE(receiver_rtcp_ - initial_receiver_rtcp,
kNumAcceptedDowntimeRtcp)
<< "RTCP sent during receiver-side downtime.";
if (time_now_ms - initial_time_ms >=
static_cast<int64_t>(kSilenceTimeoutMs)) {
receiver_done = true;
}
} else {
if (receiver_rtcp_ > initial_receiver_rtcp)
receiver_done = true;
}
}
}
const scoped_ptr<CriticalSectionWrapper> test_crit_; const scoped_ptr<CriticalSectionWrapper> test_crit_;
scoped_ptr<EventWrapper> encoded_frames_; const scoped_ptr<EventWrapper> encoded_frames_;
scoped_ptr<EventWrapper> sender_packets_; const scoped_ptr<EventWrapper> packet_event_;
scoped_ptr<EventWrapper> receiver_packets_;
Call* sender_call_; Call* sender_call_;
Call* receiver_call_; Call* receiver_call_;
Call::NetworkState sender_state_ GUARDED_BY(test_crit_); Call::NetworkState sender_state_ GUARDED_BY(test_crit_);
int down_sender_rtp_ GUARDED_BY(test_crit_); int sender_rtp_ GUARDED_BY(test_crit_);
int down_sender_rtcp_ GUARDED_BY(test_crit_); int sender_rtcp_ GUARDED_BY(test_crit_);
Call::NetworkState receiver_state_ GUARDED_BY(test_crit_); int receiver_rtcp_ GUARDED_BY(test_crit_);
int down_receiver_rtcp_ GUARDED_BY(test_crit_);
int down_frames_ GUARDED_BY(test_crit_); int down_frames_ GUARDED_BY(test_crit_);
} test; } test;

View File

@ -78,8 +78,10 @@ class LoopBackTransport : public webrtc::Transport {
void StorePacket(Packet::Type type, int channel, void StorePacket(Packet::Type type, int channel,
const void* data, const void* data,
size_t len) { size_t len) {
{
webrtc::CriticalSectionScoped lock(crit_.get()); webrtc::CriticalSectionScoped lock(crit_.get());
packet_queue_.push_back(Packet(type, channel, data, len)); packet_queue_.push_back(Packet(type, channel, data, len));
}
packet_event_->Set(); packet_event_->Set();
} }
@ -90,7 +92,6 @@ class LoopBackTransport : public webrtc::Transport {
bool SendPackets() { bool SendPackets() {
switch (packet_event_->Wait(10)) { switch (packet_event_->Wait(10)) {
case webrtc::kEventSignaled: case webrtc::kEventSignaled:
packet_event_->Reset();
break; break;
case webrtc::kEventTimeout: case webrtc::kEventTimeout:
break; break;
@ -123,9 +124,9 @@ class LoopBackTransport : public webrtc::Transport {
return true; return true;
} }
webrtc::scoped_ptr<webrtc::CriticalSectionWrapper> crit_; const webrtc::scoped_ptr<webrtc::CriticalSectionWrapper> crit_;
webrtc::scoped_ptr<webrtc::EventWrapper> packet_event_; const webrtc::scoped_ptr<webrtc::EventWrapper> packet_event_;
webrtc::scoped_ptr<webrtc::ThreadWrapper> thread_; const webrtc::scoped_ptr<webrtc::ThreadWrapper> thread_;
std::deque<Packet> packet_queue_ GUARDED_BY(crit_.get()); std::deque<Packet> packet_queue_ GUARDED_BY(crit_.get());
webrtc::VoENetwork* const voe_network_; webrtc::VoENetwork* const voe_network_;
webrtc::Atomic32 transmitted_packets_; webrtc::Atomic32 transmitted_packets_;

View File

@ -29,7 +29,6 @@ class TestRtpObserver : public webrtc::VoERTPObserver {
void WaitForChangedSsrc() { void WaitForChangedSsrc() {
// 10 seconds should be enough. // 10 seconds should be enough.
EXPECT_EQ(voetest::kEventSignaled, changed_ssrc_event_->Wait(10*1000)); EXPECT_EQ(voetest::kEventSignaled, changed_ssrc_event_->Wait(10*1000));
changed_ssrc_event_->Reset();
} }
void SetIncomingSsrc(unsigned int ssrc) { void SetIncomingSsrc(unsigned int ssrc) {
voetest::CriticalSectionScoped lock(crit_.get()); voetest::CriticalSectionScoped lock(crit_.get());