Potential dead lock in receive statistics

A dead lock could occur if the following to code paths are called
concurrently:

ReceiveStatisticsImpl::IncomingPacket() ->
  StreamStatisticianImpl::IncomingPacket()

StreamStatisticianImpl::GetStatistics() ->
  ReceiveStatisticsImpl::StatisticsUpdated()

Solution is to release ReceiveStatisticsImpl lock after lookup/lazy-init of StreamStatisticianImpl. Don't need to hold it when doing the call to StreamStatisticianImpl::IncomingPacket().

BUG=2818
R=asapersson@webrtc.org, stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/7389004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@5406 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
sprang@webrtc.org 2014-01-21 16:33:37 +00:00
parent 32c3247418
commit 7dba27c740
2 changed files with 69 additions and 56 deletions

View File

@ -28,7 +28,7 @@ StreamStatisticianImpl::StreamStatisticianImpl(
Clock* clock,
RtcpStatisticsCallback* rtcp_callback)
: clock_(clock),
crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
stream_lock_(CriticalSectionWrapper::CreateCriticalSection()),
incoming_bitrate_(clock, NULL),
ssrc_(0),
max_reordering_threshold_(kDefaultMaxReorderingThreshold),
@ -55,7 +55,7 @@ StreamStatisticianImpl::StreamStatisticianImpl(
rtcp_callback_(rtcp_callback) {}
void StreamStatisticianImpl::ResetStatistics() {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
last_report_inorder_packets_ = 0;
last_report_old_packets_ = 0;
last_report_seq_max_ = 0;
@ -75,7 +75,7 @@ void StreamStatisticianImpl::ResetStatistics() {
void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
size_t bytes,
bool retransmitted) {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
bool in_order = InOrderPacketInternal(header.sequenceNumber);
ssrc_ = header.ssrc;
incoming_bitrate_.Update(bytes);
@ -170,13 +170,14 @@ void StreamStatisticianImpl::IncomingPacket(const RTPHeader& header,
void StreamStatisticianImpl::SetMaxReorderingThreshold(
int max_reordering_threshold) {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
max_reordering_threshold_ = max_reordering_threshold;
}
bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
bool reset) {
CriticalSectionScoped cs(crit_sect_.get());
{
CriticalSectionScoped cs(stream_lock_.get());
if (received_seq_first_ == 0 && received_byte_count_ == 0) {
// We have not received anything.
return false;
@ -192,6 +193,16 @@ bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
return true;
}
*statistics = CalculateStatistics();
}
rtcp_callback_->StatisticsUpdated(*statistics, ssrc_);
return true;
}
RtcpStatistics StreamStatisticianImpl::CalculateStatistics() {
RtcpStatistics stats;
if (last_report_inorder_packets_ == 0) {
// First time we send a report.
last_report_seq_max_ = received_seq_first_ - 1;
@ -233,32 +244,30 @@ bool StreamStatisticianImpl::GetStatistics(RtcpStatistics* statistics,
local_fraction_lost =
static_cast<uint8_t>(255 * missing / exp_since_last);
}
statistics->fraction_lost = local_fraction_lost;
stats.fraction_lost = local_fraction_lost;
// We need a counter for cumulative loss too.
cumulative_loss_ += missing;
statistics->cumulative_lost = cumulative_loss_;
statistics->extended_max_sequence_number = (received_seq_wraps_ << 16) +
received_seq_max_;
stats.cumulative_lost = cumulative_loss_;
stats.extended_max_sequence_number =
(received_seq_wraps_ << 16) + received_seq_max_;
// Note: internal jitter value is in Q4 and needs to be scaled by 1/16.
statistics->jitter = jitter_q4_ >> 4;
if (reset) {
stats.jitter = jitter_q4_ >> 4;
// Store this report.
last_reported_statistics_ = *statistics;
last_reported_statistics_ = stats;
// Only for report blocks in RTCP SR and RR.
last_report_inorder_packets_ = received_inorder_packet_count_;
last_report_old_packets_ = received_retransmitted_packets_;
last_report_seq_max_ = received_seq_max_;
}
rtcp_callback_->StatisticsUpdated(last_reported_statistics_, ssrc_);
return true;
return stats;
}
void StreamStatisticianImpl::GetDataCounters(
uint32_t* bytes_received, uint32_t* packets_received) const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
if (bytes_received) {
*bytes_received = received_byte_count_;
}
@ -269,25 +278,25 @@ void StreamStatisticianImpl::GetDataCounters(
}
uint32_t StreamStatisticianImpl::BitrateReceived() const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
return incoming_bitrate_.BitrateNow();
}
void StreamStatisticianImpl::ProcessBitrate() {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
incoming_bitrate_.Process();
}
void StreamStatisticianImpl::LastReceiveTimeNtp(uint32_t* secs,
uint32_t* frac) const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
*secs = last_receive_time_secs_;
*frac = last_receive_time_frac_;
}
bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
const RTPHeader& header, int min_rtt) const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
if (InOrderPacketInternal(header.sequenceNumber)) {
return false;
}
@ -322,7 +331,7 @@ bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
}
bool StreamStatisticianImpl::IsPacketInOrder(uint16_t sequence_number) const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(stream_lock_.get());
return InOrderPacketInternal(sequence_number);
}
@ -347,7 +356,7 @@ ReceiveStatistics* ReceiveStatistics::Create(Clock* clock) {
ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
: clock_(clock),
crit_sect_(CriticalSectionWrapper::CreateCriticalSection()),
receive_statistics_lock_(CriticalSectionWrapper::CreateCriticalSection()),
last_rate_update_ms_(0),
rtcp_stats_callback_(NULL) {}
@ -360,19 +369,22 @@ ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
void ReceiveStatisticsImpl::IncomingPacket(const RTPHeader& header,
size_t bytes, bool old_packet) {
CriticalSectionScoped cs(crit_sect_.get());
StatisticianImplMap::iterator it = statisticians_.find(header.ssrc);
StatisticianImplMap::iterator it;
{
CriticalSectionScoped cs(receive_statistics_lock_.get());
it = statisticians_.find(header.ssrc);
if (it == statisticians_.end()) {
std::pair<StatisticianImplMap::iterator, uint32_t> insert_result =
statisticians_.insert(std::make_pair(
header.ssrc, new StreamStatisticianImpl(clock_, this)));
it = insert_result.first;
}
statisticians_[header.ssrc]->IncomingPacket(header, bytes, old_packet);
}
it->second->IncomingPacket(header, bytes, old_packet);
}
void ReceiveStatisticsImpl::ChangeSsrc(uint32_t from_ssrc, uint32_t to_ssrc) {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
StatisticianImplMap::iterator from_it = statisticians_.find(from_ssrc);
if (from_it == statisticians_.end())
return;
@ -383,7 +395,7 @@ void ReceiveStatisticsImpl::ChangeSsrc(uint32_t from_ssrc, uint32_t to_ssrc) {
}
StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
StatisticianMap active_statisticians;
for (StatisticianImplMap::const_iterator it = statisticians_.begin();
it != statisticians_.end(); ++it) {
@ -400,7 +412,7 @@ StatisticianMap ReceiveStatisticsImpl::GetActiveStatisticians() const {
StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
uint32_t ssrc) const {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
StatisticianImplMap::const_iterator it = statisticians_.find(ssrc);
if (it == statisticians_.end())
return NULL;
@ -409,7 +421,7 @@ StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
int max_reordering_threshold) {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
for (StatisticianImplMap::iterator it = statisticians_.begin();
it != statisticians_.end(); ++it) {
it->second->SetMaxReorderingThreshold(max_reordering_threshold);
@ -417,7 +429,7 @@ void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
}
int32_t ReceiveStatisticsImpl::Process() {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
for (StatisticianImplMap::iterator it = statisticians_.begin();
it != statisticians_.end(); ++it) {
it->second->ProcessBitrate();
@ -427,7 +439,7 @@ int32_t ReceiveStatisticsImpl::Process() {
}
int32_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
int time_since_last_update = clock_->TimeInMilliseconds() -
last_rate_update_ms_;
return std::max(kStatisticsProcessIntervalMs - time_since_last_update, 0);
@ -435,7 +447,7 @@ int32_t ReceiveStatisticsImpl::TimeUntilNextProcess() {
void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
RtcpStatisticsCallback* callback) {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
if (callback != NULL)
assert(rtcp_stats_callback_ == NULL);
rtcp_stats_callback_ = callback;
@ -443,7 +455,7 @@ void ReceiveStatisticsImpl::RegisterRtcpStatisticsCallback(
void ReceiveStatisticsImpl::StatisticsUpdated(const RtcpStatistics& statistics,
uint32_t ssrc) {
CriticalSectionScoped cs(crit_sect_.get());
CriticalSectionScoped cs(receive_statistics_lock_.get());
if (rtcp_stats_callback_) {
rtcp_stats_callback_->StatisticsUpdated(statistics, ssrc);
}

View File

@ -45,9 +45,10 @@ class StreamStatisticianImpl : public StreamStatistician {
private:
bool InOrderPacketInternal(uint16_t sequence_number) const;
RtcpStatistics CalculateStatistics();
Clock* clock_;
scoped_ptr<CriticalSectionWrapper> crit_sect_;
scoped_ptr<CriticalSectionWrapper> stream_lock_;
Bitrate incoming_bitrate_;
uint32_t ssrc_;
int max_reordering_threshold_; // In number of packets or sequence numbers.
@ -112,7 +113,7 @@ class ReceiveStatisticsImpl : public ReceiveStatistics,
typedef std::map<uint32_t, StreamStatisticianImpl*> StatisticianImplMap;
Clock* clock_;
scoped_ptr<CriticalSectionWrapper> crit_sect_;
scoped_ptr<CriticalSectionWrapper> receive_statistics_lock_;
int64_t last_rate_update_ms_;
StatisticianImplMap statisticians_;