Remove ViESender.

Registers transport on construction removing the need for ViESender as a
hop and removing a potential deadlock by removing RegisterSendTransport.

BUG=1695, 2999
R=stefan@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/57449004

Cr-Commit-Position: refs/heads/master@{#9309}
This commit is contained in:
Peter Boström 2015-05-28 14:10:39 +02:00
parent 259bd2034c
commit 2251d6e174
13 changed files with 40 additions and 228 deletions

View File

@ -185,23 +185,6 @@
fun:StartThread
}
{
bug_329_5
Memcheck:Param
socketcall.sendto(msg)
obj:*libpthread-*.so
fun:_ZN6webrtc14UdpSocketPosix6SendToEPKaiRKNS_13SocketAddressE
fun:_ZN6webrtc16UdpTransportImpl14SendRTCPPacketEiPKvi
fun:_ZN6webrtc9ViESender14SendRTCPPacketEiPKvi
fun:_ZN6webrtc10RTCPSender13SendToNetworkEPKht
fun:_ZN6webrtc10RTCPSender8SendRTCPEjiPKtbm
fun:_ZN6webrtc17ModuleRtpRtcpImpl7ProcessEv
fun:_ZN6webrtc17ProcessThreadImpl7ProcessEv
fun:_ZN6webrtc17ProcessThreadImpl3RunEPv
fun:_ZN6webrtc11ThreadPosix3RunEv
fun:StartThread
}
{
bug_329_7
Memcheck:Unaddressable

View File

@ -75,7 +75,6 @@ char kTSanDefaultSuppressions[] =
"deadlock:webrtc::ViEChannel::StartSend\n"
"deadlock:webrtc::ViECodecImpl::GetSendSideDelay\n"
"deadlock:webrtc::ViEEncoder::OnLocalSsrcChanged\n"
"deadlock:webrtc::ViESender::RegisterSendTransport\n"
// TODO(pbos): Trace events are racy due to lack of proper POD atomics.
// https://code.google.com/p/webrtc/issues/detail?id=2497

View File

@ -117,6 +117,7 @@ class Call : public webrtc::Call, public PacketReceiver {
// and receivers.
rtc::CriticalSection network_enabled_crit_;
bool network_enabled_ GUARDED_BY(network_enabled_crit_);
TransportAdapter transport_adapter_;
rtc::scoped_ptr<RWLockWrapper> receive_crit_;
std::map<uint32_t, AudioReceiveStream*> audio_receive_ssrcs_
@ -147,11 +148,12 @@ namespace internal {
Call::Call(const Call::Config& config)
: num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
module_process_thread_(ProcessThread::Create()),
channel_group_(new ChannelGroup(module_process_thread_.get(), nullptr)),
channel_group_(new ChannelGroup(module_process_thread_.get())),
base_channel_id_(0),
next_channel_id_(base_channel_id_ + 1),
config_(config),
network_enabled_(true),
transport_adapter_(nullptr),
receive_crit_(RWLockWrapper::CreateRWLock()),
send_crit_(RWLockWrapper::CreateRWLock()) {
DCHECK(config.send_transport != nullptr);
@ -169,8 +171,8 @@ Call::Call(const Call::Config& config)
// TODO(pbos): Remove base channel when CreateReceiveChannel no longer
// requires one.
CHECK(channel_group_->CreateSendChannel(base_channel_id_, 0, num_cpu_cores_,
true));
CHECK(channel_group_->CreateSendChannel(
base_channel_id_, 0, &transport_adapter_, num_cpu_cores_, true));
if (config.overuse_callback) {
overuse_observer_proxy_.reset(

View File

@ -138,7 +138,8 @@ VideoReceiveStream::VideoReceiveStream(int num_cpu_cores,
channel_id_(channel_id),
voe_sync_interface_(nullptr) {
CHECK(channel_group_->CreateReceiveChannel(channel_id_, 0, base_channel_id,
num_cpu_cores, true));
&transport_adapter_, num_cpu_cores,
true));
vie_channel_ = channel_group_->GetChannel(channel_id_);
@ -184,8 +185,6 @@ VideoReceiveStream::VideoReceiveStream(int num_cpu_cores,
}
}
vie_channel_->RegisterSendTransport(&transport_adapter_);
if (config_.rtp.fec.ulpfec_payload_type != -1) {
// ULPFEC without RED doesn't make sense.
DCHECK(config_.rtp.fec.red_payload_type != -1);
@ -260,8 +259,6 @@ VideoReceiveStream::~VideoReceiveStream() {
for (size_t i = 0; i < config_.decoders.size(); ++i)
vie_channel_->DeRegisterExternalDecoder(config_.decoders[i].payload_type);
vie_channel_->DeregisterSendTransport();
if (voe_sync_interface_ != nullptr) {
vie_channel_->SetVoiceChannel(-1, nullptr);
voe_sync_interface_->Release();

View File

@ -119,7 +119,8 @@ VideoSendStream::VideoSendStream(
channel_id_(channel_id),
use_config_bitrate_(true),
stats_proxy_(Clock::GetRealTimeClock(), config) {
CHECK(channel_group->CreateSendChannel(channel_id_, 0, num_cpu_cores, true));
CHECK(channel_group->CreateSendChannel(channel_id_, 0, &transport_adapter_,
num_cpu_cores, true));
vie_channel_ = channel_group_->GetChannel(channel_id_);
vie_encoder_ = channel_group_->GetEncoder(channel_id_);
@ -180,7 +181,6 @@ VideoSendStream::VideoSendStream(
vie_capturer_ = new ViECapturer(module_process_thread_, vie_encoder_);
vie_channel_->RegisterSendTransport(&transport_adapter_);
// 28 to match packet overhead in ModuleRtpRtcpImpl.
vie_channel_->SetMTU(
static_cast<unsigned int>(config_.rtp.max_packet_size + 28));
@ -238,8 +238,6 @@ VideoSendStream::~VideoSendStream() {
vie_encoder_->RegisterPreEncodeCallback(nullptr);
vie_encoder_->RegisterPostEncodeImageCallback(nullptr);
vie_channel_->DeregisterSendTransport();
vie_capturer_->RegisterCpuOveruseObserver(nullptr);
delete vie_capturer_;

View File

@ -41,8 +41,6 @@ source_set("video_engine_core") {
"vie_receiver.h",
"vie_remb.cc",
"vie_remb.h",
"vie_sender.cc",
"vie_sender.h",
"vie_sync_module.cc",
"vie_sync_module.h",
]

View File

@ -49,7 +49,6 @@
'vie_channel_group.h',
'vie_encoder.h',
'vie_receiver.h',
'vie_sender.h',
'vie_sync_module.h',
# ViE
@ -65,7 +64,6 @@
'vie_encoder.cc',
'vie_receiver.cc',
'vie_remb.cc',
'vie_sender.cc',
'vie_sync_module.cc',
], # source
# TODO(jschuh): Bug 1348: fix size_t to int truncations.

View File

@ -83,6 +83,7 @@ ViEChannel::ViEChannel(int32_t channel_id,
int32_t engine_id,
uint32_t number_of_cores,
const Config& config,
Transport* transport,
ProcessThread& module_process_thread,
RtcpIntraFrameObserver* intra_frame_observer,
RtcpBandwidthObserver* bandwidth_observer,
@ -119,7 +120,7 @@ ViEChannel::ViEChannel(int32_t channel_id,
send_timestamp_extension_id_(kInvalidRtpExtensionId),
absolute_send_time_extension_id_(kInvalidRtpExtensionId),
video_rotation_extension_id_(kInvalidRtpExtensionId),
external_transport_(NULL),
transport_(transport),
decoder_reset_(true),
wait_for_key_frame_(false),
mtu_(0),
@ -1294,10 +1295,6 @@ void ViEChannel::RegisterSendBitrateObserver(
int32_t ViEChannel::StartSend() {
CriticalSectionScoped cs(callback_cs_.get());
if (!external_transport_) {
LOG(LS_ERROR) << "No transport set.";
return -1;
}
rtp_rtcp_->SetSendingMediaStatus(true);
if (rtp_rtcp_->Sending()) {
@ -1368,56 +1365,15 @@ int32_t ViEChannel::StopReceive() {
return 0;
}
int32_t ViEChannel::RegisterSendTransport(Transport* transport) {
if (rtp_rtcp_->Sending()) {
return -1;
}
CriticalSectionScoped cs(callback_cs_.get());
if (external_transport_) {
LOG_F(LS_ERROR) << "Transport already registered.";
return -1;
}
external_transport_ = transport;
vie_sender_.RegisterSendTransport(transport);
return 0;
}
int32_t ViEChannel::DeregisterSendTransport() {
CriticalSectionScoped cs(callback_cs_.get());
if (!external_transport_) {
return 0;
}
if (rtp_rtcp_->Sending()) {
LOG_F(LS_ERROR) << "Can't deregister transport when sending.";
return -1;
}
external_transport_ = NULL;
vie_sender_.DeregisterSendTransport();
return 0;
}
int32_t ViEChannel::ReceivedRTPPacket(
const void* rtp_packet, const size_t rtp_packet_length,
int32_t ViEChannel::ReceivedRTPPacket(const void* rtp_packet,
size_t rtp_packet_length,
const PacketTime& packet_time) {
{
CriticalSectionScoped cs(callback_cs_.get());
if (!external_transport_) {
return -1;
}
}
return vie_receiver_.ReceivedRTPPacket(
rtp_packet, rtp_packet_length, packet_time);
}
int32_t ViEChannel::ReceivedRTCPPacket(
const void* rtcp_packet, const size_t rtcp_packet_length) {
{
CriticalSectionScoped cs(callback_cs_.get());
if (!external_transport_) {
return -1;
}
}
int32_t ViEChannel::ReceivedRTCPPacket(const void* rtcp_packet,
size_t rtcp_packet_length) {
return vie_receiver_.ReceivedRTCPPacket(rtcp_packet, rtcp_packet_length);
}
@ -1636,7 +1592,7 @@ RtpRtcp::Configuration ViEChannel::CreateRtpRtcpConfiguration() {
configuration.id = ViEModuleId(engine_id_, channel_id_);
configuration.audio = false;
configuration.receiver_only = !sender_;
configuration.outgoing_transport = &vie_sender_;
configuration.outgoing_transport = transport_;
if (sender_)
configuration.intra_frame_callback = intra_frame_observer_;
configuration.rtt_stats = rtt_stats_;

View File

@ -24,7 +24,6 @@
#include "webrtc/typedefs.h"
#include "webrtc/video_engine/vie_defines.h"
#include "webrtc/video_engine/vie_receiver.h"
#include "webrtc/video_engine/vie_sender.h"
#include "webrtc/video_engine/vie_sync_module.h"
namespace webrtc {
@ -105,6 +104,7 @@ class ViEChannel : public VCMFrameTypeCallback,
int32_t engine_id,
uint32_t number_of_cores,
const Config& config,
Transport* transport,
ProcessThread& module_process_thread,
RtcpIntraFrameObserver* intra_frame_observer,
RtcpBandwidthObserver* bandwidth_observer,
@ -268,15 +268,9 @@ class ViEChannel : public VCMFrameTypeCallback,
int32_t StartReceive();
int32_t StopReceive();
int32_t RegisterSendTransport(Transport* transport);
int32_t DeregisterSendTransport();
// Incoming packet from external transport.
int32_t ReceivedRTPPacket(const void* rtp_packet,
const size_t rtp_packet_length,
const PacketTime& packet_time);
// Incoming packet from external transport.
int32_t ReceivedRTCPPacket(const void* rtcp_packet,
const size_t rtcp_packet_length);
@ -487,7 +481,6 @@ class ViEChannel : public VCMFrameTypeCallback,
VideoCodingModule* const vcm_;
ViEReceiver vie_receiver_;
ViESender vie_sender_;
ViESyncModule vie_sync_;
// Helper to report call statistics.
@ -511,7 +504,7 @@ class ViEChannel : public VCMFrameTypeCallback,
int absolute_send_time_extension_id_;
int video_rotation_extension_id_;
Transport* external_transport_;
Transport* const transport_;
bool decoder_reset_;
// Current receive codec used for codec change callback.

View File

@ -141,7 +141,7 @@ class WrappingBitrateEstimator : public RemoteBitrateEstimator {
};
} // namespace
ChannelGroup::ChannelGroup(ProcessThread* process_thread, const Config* config)
ChannelGroup::ChannelGroup(ProcessThread* process_thread)
: remb_(new VieRemb()),
bitrate_allocator_(new BitrateAllocator()),
call_stats_(new CallStats()),
@ -154,8 +154,7 @@ ChannelGroup::ChannelGroup(ProcessThread* process_thread, const Config* config)
BitrateController::kDefaultStartBitrateKbps,
0)),
encoder_map_cs_(CriticalSectionWrapper::CreateCriticalSection()),
config_(config),
own_config_(),
config_(new Config),
process_thread_(process_thread),
pacer_thread_(ProcessThread::Create()),
// Constructed last as this object calls the provided callback on
@ -163,16 +162,8 @@ ChannelGroup::ChannelGroup(ProcessThread* process_thread, const Config* config)
bitrate_controller_(
BitrateController::CreateBitrateController(Clock::GetRealTimeClock(),
this)) {
if (!config) {
own_config_.reset(new Config);
config_ = own_config_.get();
}
DCHECK(config_); // Must have a valid config pointer here.
remote_bitrate_estimator_.reset(
new WrappingBitrateEstimator(remb_.get(),
Clock::GetRealTimeClock(),
*config_));
remote_bitrate_estimator_.reset(new WrappingBitrateEstimator(
remb_.get(), Clock::GetRealTimeClock(), *config_.get()));
call_stats_->RegisterStatsObserver(remote_bitrate_estimator_.get());
@ -200,16 +191,18 @@ ChannelGroup::~ChannelGroup() {
bool ChannelGroup::CreateSendChannel(int channel_id,
int engine_id,
Transport* transport,
int number_of_cores,
bool disable_default_encoder) {
rtc::scoped_ptr<ViEEncoder> vie_encoder(new ViEEncoder(
channel_id, number_of_cores, *config_, *process_thread_, pacer_.get(),
bitrate_allocator_.get(), bitrate_controller_.get(), false));
rtc::scoped_ptr<ViEEncoder> vie_encoder(
new ViEEncoder(channel_id, number_of_cores, *config_.get(),
*process_thread_, pacer_.get(), bitrate_allocator_.get(),
bitrate_controller_.get(), false));
if (!vie_encoder->Init()) {
return false;
}
ViEEncoder* encoder = vie_encoder.get();
if (!CreateChannel(channel_id, engine_id, number_of_cores,
if (!CreateChannel(channel_id, engine_id, transport, number_of_cores,
vie_encoder.release(), true, disable_default_encoder)) {
return false;
}
@ -232,15 +225,17 @@ bool ChannelGroup::CreateSendChannel(int channel_id,
bool ChannelGroup::CreateReceiveChannel(int channel_id,
int engine_id,
int base_channel_id,
Transport* transport,
int number_of_cores,
bool disable_default_encoder) {
ViEEncoder* encoder = GetEncoder(base_channel_id);
return CreateChannel(channel_id, engine_id, number_of_cores, encoder, false,
disable_default_encoder);
return CreateChannel(channel_id, engine_id, transport, number_of_cores,
encoder, false, disable_default_encoder);
}
bool ChannelGroup::CreateChannel(int channel_id,
int engine_id,
Transport* transport,
int number_of_cores,
ViEEncoder* vie_encoder,
bool sender,
@ -248,8 +243,8 @@ bool ChannelGroup::CreateChannel(int channel_id,
DCHECK(vie_encoder);
rtc::scoped_ptr<ViEChannel> channel(new ViEChannel(
channel_id, engine_id, number_of_cores, *config_, *process_thread_,
encoder_state_feedback_->GetRtcpIntraFrameObserver(),
channel_id, engine_id, number_of_cores, *config_.get(), transport,
*process_thread_, encoder_state_feedback_->GetRtcpIntraFrameObserver(),
bitrate_controller_->CreateRtcpBandwidthObserver(),
remote_bitrate_estimator_.get(), call_stats_->rtcp_rtt_stats(),
pacer_.get(), packet_router_.get(), sender, disable_default_encoder));

View File

@ -40,15 +40,17 @@ typedef std::list<ViEChannel*> ChannelList;
// group are assumed to send/receive data to the same end-point.
class ChannelGroup : public BitrateObserver {
public:
ChannelGroup(ProcessThread* process_thread, const Config* config);
explicit ChannelGroup(ProcessThread* process_thread);
~ChannelGroup();
bool CreateSendChannel(int channel_id,
int engine_id,
Transport* transport,
int number_of_cores,
bool disable_default_encoder);
bool CreateReceiveChannel(int channel_id,
int engine_id,
int base_channel_id,
Transport* transport,
int number_of_cores,
bool disable_default_encoder);
void DeleteChannel(int channel_id);
@ -84,6 +86,7 @@ class ChannelGroup : public BitrateObserver {
bool CreateChannel(int channel_id,
int engine_id,
Transport* transport,
int number_of_cores,
ViEEncoder* vie_encoder,
bool sender,
@ -105,9 +108,7 @@ class ChannelGroup : public BitrateObserver {
EncoderMap send_encoders_;
rtc::scoped_ptr<CriticalSectionWrapper> encoder_map_cs_;
const Config* config_;
// Placeholder for the case where this owns the config.
rtc::scoped_ptr<Config> own_config_;
const rtc::scoped_ptr<Config> config_;
// Registered at construct time and assumed to outlive this class.
ProcessThread* process_thread_;

View File

@ -1,60 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "webrtc/video_engine/vie_sender.h"
#include "webrtc/modules/rtp_rtcp/source/rtp_sender.h"
#include "webrtc/system_wrappers/interface/critical_section_wrapper.h"
#include "webrtc/system_wrappers/interface/trace.h"
namespace webrtc {
ViESender::ViESender()
: critsect_(CriticalSectionWrapper::CreateCriticalSection()),
transport_(NULL) {
}
int ViESender::RegisterSendTransport(Transport* transport) {
CriticalSectionScoped cs(critsect_.get());
if (transport_) {
return -1;
}
transport_ = transport;
return 0;
}
int ViESender::DeregisterSendTransport() {
CriticalSectionScoped cs(critsect_.get());
if (transport_ == NULL) {
return -1;
}
transport_ = NULL;
return 0;
}
int ViESender::SendPacket(int id, const void* data, size_t len) {
CriticalSectionScoped cs(critsect_.get());
if (!transport_) {
// No transport
return -1;
}
return transport_->SendPacket(id, data, len);
}
int ViESender::SendRTCPPacket(int id, const void* data, size_t len) {
CriticalSectionScoped cs(critsect_.get());
if (!transport_) {
return -1;
}
return transport_->SendRTCPPacket(id, data, len);
}
} // namespace webrtc

View File

@ -1,48 +0,0 @@
/*
* Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
// ViESender is responsible for sending packets to network.
#ifndef WEBRTC_VIDEO_ENGINE_VIE_SENDER_H_
#define WEBRTC_VIDEO_ENGINE_VIE_SENDER_H_
#include "webrtc/base/scoped_ptr.h"
#include "webrtc/common_types.h"
#include "webrtc/engine_configurations.h"
#include "webrtc/typedefs.h"
#include "webrtc/video_engine/vie_defines.h"
namespace webrtc {
class CriticalSectionWrapper;
class Transport;
class VideoCodingModule;
class ViESender: public Transport {
public:
ViESender();
// Registers transport to use for sending RTP and RTCP.
int RegisterSendTransport(Transport* transport);
int DeregisterSendTransport();
// Implements Transport.
int SendPacket(int vie_id, const void* data, size_t len) override;
int SendRTCPPacket(int vie_id, const void* data, size_t len) override;
private:
rtc::scoped_ptr<CriticalSectionWrapper> critsect_;
Transport* transport_;
};
} // namespace webrtc
#endif // WEBRTC_VIDEO_ENGINE_VIE_SENDER_H_