Fix bug in Transport where channel_.clear() was being called without a lock.

Looks like this snuck in between misaligned braces.

Also switching to C++11 for loops, reducing lock scopes in a few places and removing locks in others.

BUG=4444
R=pthatcher@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/43769004

Cr-Commit-Position: refs/heads/master@{#8765}
git-svn-id: http://webrtc.googlecode.com/svn/trunk@8765 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
tommi@webrtc.org 2015-03-17 21:40:06 +00:00
parent b493cb4497
commit 462dbcfc2a
2 changed files with 96 additions and 81 deletions

View File

@ -197,21 +197,29 @@ TransportChannelImpl* Transport::CreateChannel(int component) {
TransportChannelImpl* Transport::CreateChannel_w(int component) { TransportChannelImpl* Transport::CreateChannel_w(int component) {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
TransportChannelImpl *impl; TransportChannelImpl* impl;
// TODO(tommi): We don't really need to grab the lock until the actual call
// to insert() below and presumably hold it throughout initialization of
// |impl| after the impl_exists check. Maybe we can factor that out to
// a separate function and not grab the lock in this function.
// Actually, we probably don't need to hold the lock while initializing
// |impl| since we can just do the insert when that's done.
rtc::CritScope cs(&crit_); rtc::CritScope cs(&crit_);
// Create the entry if it does not exist. // Create the entry if it does not exist.
bool impl_exists = false; bool impl_exists = false;
if (channels_.find(component) == channels_.end()) { auto iterator = channels_.find(component);
if (iterator == channels_.end()) {
impl = CreateTransportChannel(component); impl = CreateTransportChannel(component);
channels_[component] = ChannelMapEntry(impl); iterator = channels_.insert(std::pair<int, ChannelMapEntry>(
component, ChannelMapEntry(impl))).first;
} else { } else {
impl = channels_[component].get(); impl = iterator->second.get();
impl_exists = true; impl_exists = true;
} }
// Increase the ref count. // Increase the ref count.
channels_[component].AddRef(); iterator->second.AddRef();
destroyed_ = false; destroyed_ = false;
if (impl_exists) { if (impl_exists) {
@ -256,6 +264,11 @@ TransportChannelImpl* Transport::CreateChannel_w(int component) {
} }
TransportChannelImpl* Transport::GetChannel(int component) { TransportChannelImpl* Transport::GetChannel(int component) {
// TODO(tommi,pthatcher): Since we're returning a pointer from the channels_
// map, shouldn't we assume that we're on the worker thread? (The pointer
// will be used outside of the lock).
// And if we're on the worker thread, which is the only thread that modifies
// channels_, can we skip grabbing the lock?
rtc::CritScope cs(&crit_); rtc::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(component); ChannelMap::iterator iter = channels_.find(component);
return (iter != channels_.end()) ? iter->second.get() : NULL; return (iter != channels_.end()) ? iter->second.get() : NULL;
@ -274,19 +287,18 @@ void Transport::DestroyChannel(int component) {
void Transport::DestroyChannel_w(int component) { void Transport::DestroyChannel_w(int component) {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
TransportChannelImpl* impl = NULL;
{
rtc::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(component); ChannelMap::iterator iter = channels_.find(component);
if (iter == channels_.end()) if (iter == channels_.end())
return; return;
TransportChannelImpl* impl = NULL;
iter->second.DecRef(); iter->second.DecRef();
if (!iter->second.ref()) { if (!iter->second.ref()) {
impl = iter->second.get(); impl = iter->second.get();
rtc::CritScope cs(&crit_);
channels_.erase(iter); channels_.erase(iter);
} }
}
if (connect_requested_ && channels_.empty()) { if (connect_requested_ && channels_.empty()) {
// We're no longer attempting to connect. // We're no longer attempting to connect.
@ -309,9 +321,9 @@ void Transport::ConnectChannels_w() {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
if (connect_requested_ || channels_.empty()) if (connect_requested_ || channels_.empty())
return; return;
connect_requested_ = true; connect_requested_ = true;
signaling_thread()->Post( signaling_thread()->Post(this, MSG_CANDIDATEREADY, NULL);
this, MSG_CANDIDATEREADY, NULL);
if (!local_description_) { if (!local_description_) {
// TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here. // TOOD(mallinath) : TransportDescription(TD) shouldn't be generated here.
@ -343,8 +355,7 @@ void Transport::OnConnecting_s() {
void Transport::DestroyAllChannels() { void Transport::DestroyAllChannels() {
ASSERT(signaling_thread()->IsCurrent()); ASSERT(signaling_thread()->IsCurrent());
worker_thread_->Invoke<void>( worker_thread_->Invoke<void>(Bind(&Transport::DestroyAllChannels_w, this));
Bind(&Transport::DestroyAllChannels_w, this));
worker_thread()->Clear(this); worker_thread()->Clear(this);
signaling_thread()->Clear(this); signaling_thread()->Clear(this);
destroyed_ = true; destroyed_ = true;
@ -352,19 +363,18 @@ void Transport::DestroyAllChannels() {
void Transport::DestroyAllChannels_w() { void Transport::DestroyAllChannels_w() {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
std::vector<TransportChannelImpl*> impls; std::vector<TransportChannelImpl*> impls;
for (auto& iter : channels_) {
iter.second.DecRef();
if (!iter.second.ref())
impls.push_back(iter.second.get());
}
{ {
rtc::CritScope cs(&crit_); rtc::CritScope cs(&crit_);
for (ChannelMap::iterator iter = channels_.begin();
iter != channels_.end();
++iter) {
iter->second.DecRef();
if (!iter->second.ref())
impls.push_back(iter->second.get());
}
}
channels_.clear(); channels_.clear();
}
for (size_t i = 0; i < impls.size(); ++i) for (size_t i = 0; i < impls.size(); ++i)
DestroyTransportChannel(impls[i]); DestroyTransportChannel(impls[i]);
@ -401,11 +411,8 @@ void Transport::OnSignalingReady() {
void Transport::CallChannels_w(TransportChannelFunc func) { void Transport::CallChannels_w(TransportChannelFunc func) {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
rtc::CritScope cs(&crit_); for (const auto& iter : channels_) {
for (ChannelMap::iterator iter = channels_.begin(); ((iter.second.get())->*func)();
iter != channels_.end();
++iter) {
((iter->second.get())->*func)();
} }
} }
@ -451,10 +458,8 @@ bool Transport::GetStats_w(TransportStats* stats) {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
stats->content_name = content_name(); stats->content_name = content_name();
stats->channel_stats.clear(); stats->channel_stats.clear();
for (ChannelMap::iterator iter = channels_.begin(); for (auto iter : channels_) {
iter != channels_.end(); ChannelMapEntry& entry = iter.second;
++iter) {
ChannelMapEntry& entry = iter->second;
TransportChannelStats substats; TransportChannelStats substats;
substats.component = entry->component(); substats.component = entry->component();
entry->GetSrtpCipher(&substats.srtp_cipher); entry->GetSrtpCipher(&substats.srtp_cipher);
@ -536,24 +541,24 @@ void Transport::OnChannelWritableState_s() {
TransportState Transport::GetTransportState_s(bool read) { TransportState Transport::GetTransportState_s(bool read) {
ASSERT(signaling_thread()->IsCurrent()); ASSERT(signaling_thread()->IsCurrent());
rtc::CritScope cs(&crit_); rtc::CritScope cs(&crit_);
bool any = false; bool any = false;
bool all = !channels_.empty(); bool all = !channels_.empty();
for (ChannelMap::iterator iter = channels_.begin(); for (const auto iter : channels_) {
iter != channels_.end(); bool b = (read ? iter.second->readable() :
++iter) { iter.second->writable());
bool b = (read ? iter->second->readable() : any |= b;
iter->second->writable()); all &= b;
any = any || b;
all = all && b;
} }
if (all) { if (all) {
return TRANSPORT_STATE_ALL; return TRANSPORT_STATE_ALL;
} else if (any) { } else if (any) {
return TRANSPORT_STATE_SOME; return TRANSPORT_STATE_SOME;
} else {
return TRANSPORT_STATE_NONE;
} }
return TRANSPORT_STATE_NONE;
} }
void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) { void Transport::OnChannelRequestSignaling(TransportChannelImpl* channel) {
@ -566,13 +571,16 @@ void Transport::OnChannelRequestSignaling_s(int component) {
ASSERT(signaling_thread()->IsCurrent()); ASSERT(signaling_thread()->IsCurrent());
LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates"; LOG(LS_INFO) << "Transport: " << content_name_ << ", allocating candidates";
// Resetting ICE state for the channel. // Resetting ICE state for the channel.
{ worker_thread_->Invoke<void>(
rtc::CritScope cs(&crit_); Bind(&Transport::OnChannelRequestSignaling_w, this, component));
SignalRequestSignaling(this);
}
void Transport::OnChannelRequestSignaling_w(int component) {
ASSERT(worker_thread()->IsCurrent());
ChannelMap::iterator iter = channels_.find(component); ChannelMap::iterator iter = channels_.find(component);
if (iter != channels_.end()) if (iter != channels_.end())
iter->second.set_candidates_allocated(false); iter->second.set_candidates_allocated(false);
}
SignalRequestSignaling(this);
} }
void Transport::OnChannelCandidateReady(TransportChannelImpl* channel, void Transport::OnChannelCandidateReady(TransportChannelImpl* channel,
@ -622,18 +630,18 @@ void Transport::OnChannelRouteChange_s(const TransportChannel* channel,
void Transport::OnChannelCandidatesAllocationDone( void Transport::OnChannelCandidatesAllocationDone(
TransportChannelImpl* channel) { TransportChannelImpl* channel) {
ASSERT(worker_thread()->IsCurrent()); ASSERT(worker_thread()->IsCurrent());
rtc::CritScope cs(&crit_);
ChannelMap::iterator iter = channels_.find(channel->component()); ChannelMap::iterator iter = channels_.find(channel->component());
ASSERT(iter != channels_.end()); ASSERT(iter != channels_.end());
LOG(LS_INFO) << "Transport: " << content_name_ << ", component " LOG(LS_INFO) << "Transport: " << content_name_ << ", component "
<< channel->component() << " allocation complete"; << channel->component() << " allocation complete";
iter->second.set_candidates_allocated(true); iter->second.set_candidates_allocated(true);
// If all channels belonging to this Transport got signal, then // If all channels belonging to this Transport got signal, then
// forward this signal to upper layer. // forward this signal to upper layer.
// Can this signal arrive before all transport channels are created? // Can this signal arrive before all transport channels are created?
for (iter = channels_.begin(); iter != channels_.end(); ++iter) { for (auto& iter : channels_) {
if (!iter->second.candidates_allocated()) if (!iter.second.candidates_allocated())
return; return;
} }
signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE); signaling_thread_->Post(this, MSG_CANDIDATEALLOCATIONCOMPLETE);
@ -680,20 +688,19 @@ void Transport::MaybeCompleted_w() {
// When there is no channel created yet, calling this function could fire an // When there is no channel created yet, calling this function could fire an
// IceConnectionCompleted event prematurely. // IceConnectionCompleted event prematurely.
if (channels_.size() == 0) { if (channels_.empty()) {
return; return;
} }
// A Transport's ICE process is completed if all of its channels are writable, // A Transport's ICE process is completed if all of its channels are writable,
// have finished allocating candidates, and have pruned all but one of their // have finished allocating candidates, and have pruned all but one of their
// connections. // connections.
ChannelMap::const_iterator iter; for (const auto& iter : channels_) {
for (iter = channels_.begin(); iter != channels_.end(); ++iter) { const TransportChannelImpl* channel = iter.second.get();
const TransportChannelImpl* channel = iter->second.get();
if (!(channel->writable() && if (!(channel->writable() &&
channel->GetState() == TransportChannelState::STATE_COMPLETED && channel->GetState() == TransportChannelState::STATE_COMPLETED &&
channel->GetIceRole() == ICEROLE_CONTROLLING && channel->GetIceRole() == ICEROLE_CONTROLLING &&
iter->second.candidates_allocated())) { iter.second.candidates_allocated())) {
return; return;
} }
} }
@ -702,21 +709,20 @@ void Transport::MaybeCompleted_w() {
} }
void Transport::SetIceRole_w(IceRole role) { void Transport::SetIceRole_w(IceRole role) {
ASSERT(worker_thread()->IsCurrent());
rtc::CritScope cs(&crit_); rtc::CritScope cs(&crit_);
ice_role_ = role; ice_role_ = role;
for (ChannelMap::iterator iter = channels_.begin(); for (auto& iter : channels_) {
iter != channels_.end(); ++iter) { iter.second->SetIceRole(ice_role_);
iter->second->SetIceRole(ice_role_);
} }
} }
void Transport::SetRemoteIceMode_w(IceMode mode) { void Transport::SetRemoteIceMode_w(IceMode mode) {
rtc::CritScope cs(&crit_); ASSERT(worker_thread()->IsCurrent());
remote_ice_mode_ = mode; remote_ice_mode_ = mode;
// Shouldn't channels be created after this method executed? // Shouldn't channels be created after this method executed?
for (ChannelMap::iterator iter = channels_.begin(); for (auto& iter : channels_) {
iter != channels_.end(); ++iter) { iter.second->SetRemoteIceMode(remote_ice_mode_);
iter->second->SetRemoteIceMode(remote_ice_mode_);
} }
} }
@ -724,14 +730,22 @@ bool Transport::SetLocalTransportDescription_w(
const TransportDescription& desc, const TransportDescription& desc,
ContentAction action, ContentAction action,
std::string* error_desc) { std::string* error_desc) {
ASSERT(worker_thread()->IsCurrent());
bool ret = true; bool ret = true;
rtc::CritScope cs(&crit_);
if (!VerifyIceParams(desc)) { if (!VerifyIceParams(desc)) {
return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
error_desc); error_desc);
} }
// TODO(tommi,pthatcher): I'm not sure why we need to grab this lock at this
// point. |local_description_| seems to always be modified on the worker
// thread, so we should be able to use it here without grabbing the lock.
// However, we _might_ need it before the call to reset() below?
// Actually, if we ever give out a pointer to the local description to
// another thread, won't we run into trouble? (see local_description() in
// the header file - no thread check, so I'm not sure from where it's called).
rtc::CritScope cs(&crit_);
if (local_description_ && IceCredentialsChanged(*local_description_, desc)) { if (local_description_ && IceCredentialsChanged(*local_description_, desc)) {
IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING IceRole new_ice_role = (action == CA_OFFER) ? ICEROLE_CONTROLLING
: ICEROLE_CONTROLLED; : ICEROLE_CONTROLLED;
@ -743,9 +757,8 @@ bool Transport::SetLocalTransportDescription_w(
local_description_.reset(new TransportDescription(desc)); local_description_.reset(new TransportDescription(desc));
for (ChannelMap::iterator iter = channels_.begin(); for (auto& iter : channels_) {
iter != channels_.end(); ++iter) { ret &= ApplyLocalTransportDescription_w(iter.second.get(), error_desc);
ret &= ApplyLocalTransportDescription_w(iter->second.get(), error_desc);
} }
if (!ret) if (!ret)
return false; return false;
@ -762,17 +775,17 @@ bool Transport::SetRemoteTransportDescription_w(
ContentAction action, ContentAction action,
std::string* error_desc) { std::string* error_desc) {
bool ret = true; bool ret = true;
rtc::CritScope cs(&crit_);
if (!VerifyIceParams(desc)) { if (!VerifyIceParams(desc)) {
return BadTransportDescription("Invalid ice-ufrag or ice-pwd length", return BadTransportDescription("Invalid ice-ufrag or ice-pwd length",
error_desc); error_desc);
} }
// TODO(tommi,pthatcher): See todo for local_description_ above.
rtc::CritScope cs(&crit_);
remote_description_.reset(new TransportDescription(desc)); remote_description_.reset(new TransportDescription(desc));
for (ChannelMap::iterator iter = channels_.begin(); for (auto& iter : channels_) {
iter != channels_.end(); ++iter) { ret &= ApplyRemoteTransportDescription_w(iter.second.get(), error_desc);
ret &= ApplyRemoteTransportDescription_w(iter->second.get(), error_desc);
} }
// If PRANSWER/ANSWER is set, we should decide transport protocol type. // If PRANSWER/ANSWER is set, we should decide transport protocol type.
@ -784,6 +797,7 @@ bool Transport::SetRemoteTransportDescription_w(
bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch, bool Transport::ApplyLocalTransportDescription_w(TransportChannelImpl* ch,
std::string* error_desc) { std::string* error_desc) {
ASSERT(worker_thread()->IsCurrent());
// If existing protocol_type is HYBRID, we may have not chosen the final // If existing protocol_type is HYBRID, we may have not chosen the final
// protocol type, so update the channel protocol type from the // protocol type, so update the channel protocol type from the
// local description. Otherwise, skip updating the protocol type. // local description. Otherwise, skip updating the protocol type.
@ -813,6 +827,7 @@ bool Transport::ApplyRemoteTransportDescription_w(TransportChannelImpl* ch,
bool Transport::ApplyNegotiatedTransportDescription_w( bool Transport::ApplyNegotiatedTransportDescription_w(
TransportChannelImpl* channel, std::string* error_desc) { TransportChannelImpl* channel, std::string* error_desc) {
ASSERT(worker_thread()->IsCurrent());
channel->SetIceProtocolType(protocol_); channel->SetIceProtocolType(protocol_);
channel->SetRemoteIceMode(remote_ice_mode_); channel->SetRemoteIceMode(remote_ice_mode_);
return true; return true;
@ -820,6 +835,7 @@ bool Transport::ApplyNegotiatedTransportDescription_w(
bool Transport::NegotiateTransportDescription_w(ContentAction local_role, bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
std::string* error_desc) { std::string* error_desc) {
ASSERT(worker_thread()->IsCurrent());
// TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into // TODO(ekr@rtfm.com): This is ICE-specific stuff. Refactor into
// P2PTransport. // P2PTransport.
const TransportDescription* offer; const TransportDescription* offer;
@ -871,10 +887,8 @@ bool Transport::NegotiateTransportDescription_w(ContentAction local_role,
// between future SetRemote/SetLocal invocations and new channel // between future SetRemote/SetLocal invocations and new channel
// creation, we have the negotiation state saved until a new // creation, we have the negotiation state saved until a new
// negotiation happens. // negotiation happens.
for (ChannelMap::iterator iter = channels_.begin(); for (auto& iter : channels_) {
iter != channels_.end(); if (!ApplyNegotiatedTransportDescription_w(iter.second.get(), error_desc))
++iter) {
if (!ApplyNegotiatedTransportDescription_w(iter->second.get(), error_desc))
return false; return false;
} }
return true; return true;

View File

@ -387,6 +387,7 @@ class Transport : public rtc::MessageHandler,
void OnChannelReadableState_s(); void OnChannelReadableState_s();
void OnChannelWritableState_s(); void OnChannelWritableState_s();
void OnChannelRequestSignaling_s(int component); void OnChannelRequestSignaling_s(int component);
void OnChannelRequestSignaling_w(int component);
void OnConnecting_s(); void OnConnecting_s();
void OnChannelRouteChange_s(const TransportChannel* channel, void OnChannelRouteChange_s(const TransportChannel* channel,
const Candidate& remote_candidate); const Candidate& remote_candidate);