Cleaned up the data path for payload data, made callbacks to rtp_receiver nonoptional.

The audio receiver is now completely independent of rtp_receiver: video will hopefully be too in the next patch.

BUG=
TEST=vie & voe_auto_test full runs

Review URL: https://webrtc-codereview.appspot.com/1014006

git-svn-id: http://webrtc.googlecode.com/svn/trunk@3372 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
phoglund@webrtc.org 2013-01-14 10:01:55 +00:00
parent 49273ffa79
commit a22a9bd9ca
12 changed files with 184 additions and 152 deletions

View File

@ -26,31 +26,19 @@ class Transport;
class RtpRtcp : public Module {
public:
struct Configuration {
Configuration()
: id(-1),
audio(false),
clock(NULL),
default_module(NULL),
incoming_data(NULL),
incoming_messages(NULL),
outgoing_transport(NULL),
rtcp_feedback(NULL),
intra_frame_callback(NULL),
bandwidth_callback(NULL),
rtt_observer(NULL),
audio_messages(NULL),
remote_bitrate_estimator(NULL),
paced_sender(NULL) {
}
Configuration();
/* id - Unique identifier of this RTP/RTCP module object
* audio - True for a audio version of the RTP/RTCP module
* object false will create a video version
* clock - The clock to use to read time. If NULL object
* will be using the system clock.
* incoming_data - Callback object that will receive the incoming
* data
* data. May not be NULL; default callback will do
* nothing.
* incoming_messages - Callback object that will receive the incoming
* RTP messages.
* RTP messages. May not be NULL; default callback
* will do nothing.
* outgoing_transport - Transport object that will be called when packets
* are ready to be sent out on the network
* rtcp_feedback - Callback object that will receive the incoming
@ -58,7 +46,8 @@ class RtpRtcp : public Module {
* intra_frame_callback - Called when the receiver request a intra frame.
* bandwidth_callback - Called when we receive a changed estimate from
* the receiver of out stream.
* audio_messages - Telehone events.
* audio_messages - Telehone events. May not be NULL; default callback
* will do nothing.
* remote_bitrate_estimator - Estimates the bandwidth available for a set of
* streams from the same client.
* paced_sender - Spread any bursts of packets into smaller

View File

@ -269,5 +269,63 @@ class RtpRtcpClock {
virtual void CurrentNTP(WebRtc_UWord32& secs, WebRtc_UWord32& frac) = 0;
};
// Null object version of RtpFeedback.
class NullRtpFeedback : public RtpFeedback {
public:
virtual ~NullRtpFeedback() {}
virtual WebRtc_Word32 OnInitializeDecoder(
const WebRtc_Word32 id,
const WebRtc_Word8 payloadType,
const char payloadName[RTP_PAYLOAD_NAME_SIZE],
const int frequency,
const WebRtc_UWord8 channels,
const WebRtc_UWord32 rate) {
return 0;
}
virtual void OnPacketTimeout(const WebRtc_Word32 id) {}
virtual void OnReceivedPacket(const WebRtc_Word32 id,
const RtpRtcpPacketType packetType) {}
virtual void OnPeriodicDeadOrAlive(const WebRtc_Word32 id,
const RTPAliveType alive) {}
virtual void OnIncomingSSRCChanged(const WebRtc_Word32 id,
const WebRtc_UWord32 SSRC) {}
virtual void OnIncomingCSRCChanged(const WebRtc_Word32 id,
const WebRtc_UWord32 CSRC,
const bool added) {}
};
// Null object version of RtpData.
class NullRtpData : public RtpData {
public:
virtual ~NullRtpData() {}
virtual WebRtc_Word32 OnReceivedPayloadData(
const WebRtc_UWord8* payloadData,
const WebRtc_UWord16 payloadSize,
const WebRtcRTPHeader* rtpHeader) {
return 0;
}
};
// Null object version of RtpAudioFeedback.
class NullRtpAudioFeedback : public RtpAudioFeedback {
public:
virtual ~NullRtpAudioFeedback() {}
virtual void OnReceivedTelephoneEvent(const WebRtc_Word32 id,
const WebRtc_UWord8 event,
const bool endOfEvent) {}
virtual void OnPlayTelephoneEvent(const WebRtc_Word32 id,
const WebRtc_UWord8 event,
const WebRtc_UWord16 lengthMs,
const WebRtc_UWord8 volume) {}
};
} // namespace webrtc
#endif // WEBRTC_MODULES_RTP_RTCP_INTERFACE_RTP_RTCP_DEFINES_H_

View File

@ -17,7 +17,7 @@ namespace webrtc {
class MockRTPReceiverVideo : public RTPReceiverVideo {
public:
MockRTPReceiverVideo() : RTPReceiverVideo(0, NULL, NULL) {}
MockRTPReceiverVideo() : RTPReceiverVideo(0, NULL, NULL, NULL) {}
MOCK_METHOD1(ChangeUniqueId,
void(const WebRtc_Word32 id));
MOCK_METHOD3(ReceiveRecoveredPacketCallback,

View File

@ -35,13 +35,21 @@ RTPReceiver::RTPReceiver(const WebRtc_Word32 id,
const bool audio,
RtpRtcpClock* clock,
ModuleRtpRtcpImpl* owner,
RtpAudioFeedback* incoming_messages_callback)
RtpAudioFeedback* incoming_audio_messages_callback,
RtpData* incoming_payload_callback,
RtpFeedback* incoming_messages_callback)
: Bitrate(clock),
// TODO(phoglund): Remove hacks requiring direct access to the
// audio receiver and only instantiate one of these directly into the
// rtp_media_receiver_ field. Right now an audio receiver carries around a
// video handler and vice versa, which doesn't make sense.
rtp_receiver_audio_(new RTPReceiverAudio(
id, incoming_payload_callback, incoming_audio_messages_callback)),
rtp_receiver_video_(new RTPReceiverVideo(
id, this, incoming_payload_callback, owner)),
id_(id),
rtp_rtcp_(*owner),
critical_section_cbs_(CriticalSectionWrapper::CreateCriticalSection()),
cb_rtp_feedback_(NULL),
cb_rtp_data_(NULL),
cb_rtp_feedback_(incoming_messages_callback),
critical_section_rtp_receiver_(
CriticalSectionWrapper::CreateCriticalSection()),
@ -94,14 +102,9 @@ RTPReceiver::RTPReceiver(const WebRtc_Word32 id,
nack_method_(kNackOff),
rtx_(false),
ssrc_rtx_(0) {
// TODO(phoglund): Remove hacks requiring direct access to the audio receiver
// and only instantiate one of these directly into the rtp_media_receiver_
// field. Right now an audio receiver carries around a video handler and
// vice versa, which doesn't make sense.
rtp_receiver_audio_ = new RTPReceiverAudio(id, this,
incoming_messages_callback);
rtp_receiver_video_ = new RTPReceiverVideo(id, this, owner);
assert(incoming_audio_messages_callback &&
incoming_messages_callback &&
incoming_payload_callback);
if (audio) {
rtp_media_receiver_ = rtp_receiver_audio_;
} else {
@ -115,13 +118,10 @@ RTPReceiver::RTPReceiver(const WebRtc_Word32 id,
}
RTPReceiver::~RTPReceiver() {
if (cb_rtp_feedback_) {
for (int i = 0; i < num_csrcs_; ++i) {
cb_rtp_feedback_->OnIncomingCSRCChanged(id_, current_remote_csrc_[i],
false);
}
}
delete critical_section_cbs_;
delete critical_section_rtp_receiver_;
while (!payload_type_map_.empty()) {
@ -187,18 +187,13 @@ void RTPReceiver::PacketTimeout() {
last_received_media_payload_type_ = -1;
}
}
CriticalSectionScoped lock(critical_section_cbs_);
if (packet_time_out && cb_rtp_feedback_) {
if (packet_time_out) {
cb_rtp_feedback_->OnPacketTimeout(id_);
}
}
void RTPReceiver::ProcessDeadOrAlive(const bool rtcp_alive,
const WebRtc_Word64 now) {
if (cb_rtp_feedback_ == NULL) {
// No callback.
return;
}
RTPAliveType alive = kRtpDead;
if (last_receive_time_ + 1000 > now) {
@ -214,10 +209,7 @@ void RTPReceiver::ProcessDeadOrAlive(const bool rtcp_alive,
}
}
CriticalSectionScoped lock(critical_section_cbs_);
if (cb_rtp_feedback_) {
cb_rtp_feedback_->OnPeriodicDeadOrAlive(id_, alive);
}
}
WebRtc_UWord16 RTPReceiver::PacketOHReceived() const {
@ -235,20 +227,6 @@ WebRtc_UWord32 RTPReceiver::ByteCountReceived() const {
return received_byte_count_;
}
WebRtc_Word32 RTPReceiver::RegisterIncomingRTPCallback(
RtpFeedback* incoming_messages_callback) {
CriticalSectionScoped lock(critical_section_cbs_);
cb_rtp_feedback_ = incoming_messages_callback;
return 0;
}
WebRtc_Word32 RTPReceiver::RegisterIncomingDataCallback(
RtpData* incoming_data_callback) {
CriticalSectionScoped lock(critical_section_cbs_);
cb_rtp_data_ = incoming_data_callback;
return 0;
}
WebRtc_Word32 RTPReceiver::RegisterReceivePayload(
const char payload_name[RTP_PAYLOAD_NAME_SIZE],
const WebRtc_Word8 payload_type,
@ -610,8 +588,6 @@ WebRtc_Word32 RTPReceiver::IncomingRTPPacket(
}
if (last_receive_time_ == 0) {
// Trigger only once.
CriticalSectionScoped lock(critical_section_cbs_);
if (cb_rtp_feedback_) {
if (length - rtp_header->header.headerLength == 0) {
// Keep-alive packet.
cb_rtp_feedback_->OnReceivedPacket(id_, kPacketKeepAlive);
@ -619,7 +595,6 @@ WebRtc_Word32 RTPReceiver::IncomingRTPPacket(
cb_rtp_feedback_->OnReceivedPacket(id_, kPacketRtp);
}
}
}
WebRtc_Word8 first_payload_byte = 0;
if (length > 0) {
first_payload_byte = packet[rtp_header->header.headerLength];
@ -686,19 +661,6 @@ WebRtc_Word32 RTPReceiver::IncomingRTPPacket(
return ret_val;
}
// Implementation note: must not hold critsect when called!
WebRtc_Word32 RTPReceiver::CallbackOfReceivedPayloadData(
const WebRtc_UWord8* payload_data,
const WebRtc_UWord16 payload_size,
const WebRtcRTPHeader* rtp_header) {
CriticalSectionScoped lock(critical_section_cbs_);
if (cb_rtp_data_) {
return cb_rtp_data_->OnReceivedPayloadData(payload_data, payload_size,
rtp_header);
}
return -1;
}
// Implementation note: we expect to have the critical_section_rtp_receiver_
// critsect when we call this.
void RTPReceiver::UpdateStatistics(const WebRtcRTPHeader* rtp_header,
@ -989,10 +951,6 @@ void RTPReceiver::CheckSSRCChanged(const WebRtcRTPHeader* rtp_header) {
// We need to get this to our RTCP sender and receiver.
// We need to do this outside critical section.
rtp_rtcp_.SetRemoteSSRC(rtp_header->header.ssrc);
}
CriticalSectionScoped lock(critical_section_cbs_);
if (cb_rtp_feedback_) {
if (new_ssrc) {
cb_rtp_feedback_->OnIncomingSSRCChanged(id_, rtp_header->header.ssrc);
}
if (re_initialize_decoder) {
@ -1005,7 +963,6 @@ void RTPReceiver::CheckSSRCChanged(const WebRtcRTPHeader* rtp_header) {
rtp_header->header.payloadType);
}
}
}
}
// Implementation note: must not hold critsect when called.
@ -1104,15 +1061,12 @@ WebRtc_Word32 RTPReceiver::CheckPayloadChanged(
} // End critsect.
if (re_initialize_decoder) {
CriticalSectionScoped lock(critical_section_cbs_);
if (cb_rtp_feedback_) {
if (-1 == rtp_media_receiver_->InvokeOnInitializeDecoder(
cb_rtp_feedback_, id_, payload_type, payload_name,
*specific_payload)) {
return -1; // Wrong payload type.
}
}
}
return 0;
}
@ -1159,10 +1113,6 @@ void RTPReceiver::CheckCSRC(const WebRtcRTPHeader* rtp_header) {
}
} // End critsect.
CriticalSectionScoped lock(critical_section_cbs_);
if (cb_rtp_feedback_ == NULL) {
return;
}
bool have_called_callback = false;
// Search for new CSRC in old array.
for (WebRtc_UWord8 i = 0; i < rtp_header->header.numCSRCs; ++i) {

View File

@ -32,11 +32,15 @@ class RTPReceiverStrategy;
class RTPReceiver : public Bitrate {
public:
// Callbacks passed in here may not be NULL (use Null object callbacks if you
// want callbacks to do nothing).
RTPReceiver(const WebRtc_Word32 id,
const bool audio,
RtpRtcpClock* clock,
ModuleRtpRtcpImpl* owner,
RtpAudioFeedback* incoming_messages_callback);
RtpAudioFeedback* incoming_audio_messages_callback,
RtpData* incoming_payload_callback,
RtpFeedback* incoming_messages_callback);
virtual ~RTPReceiver();
@ -50,10 +54,6 @@ class RTPReceiver : public Bitrate {
void ProcessBitrate();
WebRtc_Word32 RegisterIncomingDataCallback(RtpData* incoming_data_callback);
WebRtc_Word32 RegisterIncomingRTPCallback(
RtpFeedback* incoming_messages_callback);
WebRtc_Word32 RegisterReceivePayload(
const char payload_name[RTP_PAYLOAD_NAME_SIZE],
const WebRtc_Word8 payload_type,
@ -161,11 +161,6 @@ class RTPReceiver : public Bitrate {
return rtp_receiver_audio_;
}
virtual WebRtc_Word32 CallbackOfReceivedPayloadData(
const WebRtc_UWord8* payload_data,
const WebRtc_UWord16 payload_size,
const WebRtcRTPHeader* rtp_header);
virtual WebRtc_Word8 REDPayloadType() const;
bool HaveNotReceivedPackets() const;
@ -202,9 +197,7 @@ class RTPReceiver : public Bitrate {
WebRtc_Word32 id_;
ModuleRtpRtcpImpl& rtp_rtcp_;
CriticalSectionWrapper* critical_section_cbs_;
RtpFeedback* cb_rtp_feedback_;
RtpData* cb_rtp_data_;
CriticalSectionWrapper* critical_section_rtp_receiver_;
mutable WebRtc_Word64 last_receive_time_;

View File

@ -15,15 +15,14 @@
#include <math.h> // pow()
#include "critical_section_wrapper.h"
#include "rtp_receiver.h"
#include "trace.h"
namespace webrtc {
RTPReceiverAudio::RTPReceiverAudio(const WebRtc_Word32 id,
RTPReceiver* parent,
RtpData* data_callback,
RtpAudioFeedback* incomingMessagesCallback)
: _id(id),
_parent(parent),
: RTPReceiverStrategy(data_callback),
_id(id),
_criticalSectionRtpReceiverAudio(
CriticalSectionWrapper::CreateCriticalSection()),
_lastReceivedFrequency(8000),
@ -512,13 +511,13 @@ RTPReceiverAudio::ParseAudioCodecSpecific(WebRtcRTPHeader* rtpHeader,
rtpHeader->header.payloadType = payloadData[0];
// only one frame in the RED strip the one byte to help NetEq
return _parent->CallbackOfReceivedPayloadData(payloadData+1,
return data_callback_->OnReceivedPayloadData(payloadData+1,
payloadLength-1,
rtpHeader);
}
rtpHeader->type.Audio.channel = audioSpecific.channels;
return _parent->CallbackOfReceivedPayloadData(
return data_callback_->OnReceivedPayloadData(
payloadData, payloadLength, rtpHeader);
}
} // namespace webrtc

View File

@ -22,14 +22,13 @@
namespace webrtc {
class CriticalSectionWrapper;
class RTPReceiver;
// Handles audio RTP packets. This class is thread-safe.
class RTPReceiverAudio : public RTPReceiverStrategy
{
public:
RTPReceiverAudio(const WebRtc_Word32 id,
RTPReceiver* parent,
RtpData* data_callback,
RtpAudioFeedback* incomingMessagesCallback);
WebRtc_UWord32 AudioFrequency() const;
@ -122,7 +121,6 @@ private:
const bool isRED);
WebRtc_Word32 _id;
RTPReceiver* _parent;
scoped_ptr<CriticalSectionWrapper> _criticalSectionRtpReceiverAudio;
WebRtc_UWord32 _lastReceivedFrequency;

View File

@ -14,7 +14,8 @@
namespace webrtc {
RTPReceiverStrategy::RTPReceiverStrategy() {
RTPReceiverStrategy::RTPReceiverStrategy(RtpData* data_callback)
: data_callback_(data_callback) {
memset(&last_payload_, 0, sizeof(last_payload_));
}

View File

@ -22,15 +22,22 @@ namespace webrtc {
// This class is not thread-safe and must be protected by its caller.
class RTPReceiverStrategy {
public:
RTPReceiverStrategy();
// The data callback is where we should send received payload data.
// See ParseRtpPacket. This class does not claim ownership of the callback.
// Implementations must NOT hold any critical sections while calling the
// callback.
//
// Note: Implementations may call the callback for other reasons than calls
// to ParseRtpPacket, for instance if the implementation somehow recovers a
// packet.
RTPReceiverStrategy(RtpData* data_callback);
virtual ~RTPReceiverStrategy() {}
// Parses the RTP packet. Implementations should keep a reference to the
// calling RTPReceiver and call CallbackOfReceivedPayloadData if parsing
// succeeds.
// TODO(phoglund): This interaction is really ugly: clean up by removing
// the need of a back reference to parent, perhaps by returning something
// instead of calling back.
// Parses the RTP packet and calls the data callback with the payload data.
// Implementations are encouraged to use the provided packet buffer and RTP
// header as arguments to the callback; implementations are also allowed to
// make changes in the data as necessary. The specific_payload argument
// provides audio or video-specific data.
virtual WebRtc_Word32 ParseRtpPacket(
WebRtcRTPHeader* rtp_header,
const ModuleRTPUtility::PayloadUnion& specific_payload,
@ -108,6 +115,7 @@ class RTPReceiverStrategy {
protected:
ModuleRTPUtility::PayloadUnion last_payload_;
RtpData* data_callback_;
};
} // namespace webrtc

View File

@ -29,8 +29,10 @@ WebRtc_UWord32 BitRateBPS(WebRtc_UWord16 x )
RTPReceiverVideo::RTPReceiverVideo(const WebRtc_Word32 id,
RTPReceiver* parent,
RtpData* data_callback,
ModuleRtpRtcpImpl* owner)
: _id(id),
: RTPReceiverStrategy(data_callback),
_id(id),
_parent(parent),
_criticalSectionReceiverVideo(
CriticalSectionWrapper::CreateCriticalSection()),
@ -176,7 +178,7 @@ WebRtc_Word32 RTPReceiverVideo::ParseVideoCodecSpecific(
}
// Pass the length of FEC packets so that they can be accounted for in
// the bandwidth estimator.
retVal = _parent->CallbackOfReceivedPayloadData(NULL, payloadDataLength,
retVal = data_callback_->OnReceivedPayloadData(NULL, payloadDataLength,
rtpHeader);
}
} else {
@ -336,7 +338,7 @@ WebRtc_Word32 RTPReceiverVideo::ReceiveVp8Codec(
// we have an "empty" VP8 packet, it's ok, could be one way video
// Inform the jitter buffer about this packet.
rtpHeader->frameType = kFrameEmpty;
if (_parent->CallbackOfReceivedPayloadData(NULL, 0, rtpHeader) != 0) {
if (data_callback_->OnReceivedPayloadData(NULL, 0, rtpHeader) != 0) {
return -1;
}
return 0;
@ -369,7 +371,7 @@ WebRtc_Word32 RTPReceiverVideo::ReceiveVp8Codec(
toHeader->partitionId = fromHeader->partitionID;
toHeader->beginningOfPartition = fromHeader->beginningOfPartition;
if(_parent->CallbackOfReceivedPayloadData(parsedPacket.info.VP8.data,
if(data_callback_->OnReceivedPayloadData(parsedPacket.info.VP8.data,
parsedPacket.info.VP8.dataLength,
rtpHeader) != 0) {
return -1;
@ -393,7 +395,7 @@ WebRtc_Word32 RTPReceiverVideo::ReceiveGenericCodec(
}
_criticalSectionReceiverVideo->Leave();
if (_parent->CallbackOfReceivedPayloadData(payloadData, payloadDataLength,
if (data_callback_->OnReceivedPayloadData(payloadData, payloadDataLength,
rtpHeader) != 0) {
return -1;
}

View File

@ -12,7 +12,6 @@
#define WEBRTC_MODULES_RTP_RTCP_SOURCE_RTP_RECEIVER_VIDEO_H_
#include "bitrate.h"
#include "rtp_receiver.h"
#include "rtp_receiver_strategy.h"
#include "rtp_rtcp_defines.h"
#include "rtp_utility.h"
@ -23,11 +22,14 @@ namespace webrtc {
class CriticalSectionWrapper;
class ModuleRtpRtcpImpl;
class ReceiverFEC;
class RTPReceiver;
class RTPReceiverVideo : public RTPReceiverStrategy {
public:
// TODO(phoglund): Get rid of dependency on "parent".
RTPReceiverVideo(const WebRtc_Word32 id,
RTPReceiver* parent,
RtpData* data_callback,
ModuleRtpRtcpImpl* owner);
virtual ~RTPReceiverVideo();

View File

@ -41,6 +41,38 @@ namespace webrtc {
const WebRtc_UWord16 kDefaultRtt = 200;
static RtpData* NullObjectRtpData() {
static NullRtpData null_rtp_data;
return &null_rtp_data;
}
static RtpFeedback* NullObjectRtpFeedback() {
static NullRtpFeedback null_rtp_feedback;
return &null_rtp_feedback;
}
static RtpAudioFeedback* NullObjectRtpAudioFeedback() {
static NullRtpAudioFeedback null_rtp_audio_feedback;
return &null_rtp_audio_feedback;
}
RtpRtcp::Configuration::Configuration()
: id(-1),
audio(false),
clock(NULL),
default_module(NULL),
incoming_data(NullObjectRtpData()),
incoming_messages(NullObjectRtpFeedback()),
outgoing_transport(NULL),
rtcp_feedback(NULL),
intra_frame_callback(NULL),
bandwidth_callback(NULL),
rtt_observer(NULL),
audio_messages(NullObjectRtpAudioFeedback()),
remote_bitrate_estimator(NULL),
paced_sender(NULL) {
}
RtpRtcp* RtpRtcp::CreateRtpRtcp(const RtpRtcp::Configuration& configuration) {
if (configuration.clock) {
return new ModuleRtpRtcpImpl(configuration);
@ -64,7 +96,9 @@ ModuleRtpRtcpImpl::ModuleRtpRtcpImpl(const Configuration& configuration)
configuration.audio_messages,
configuration.paced_sender),
_rtpReceiver(configuration.id, configuration.audio, configuration.clock,
this, configuration.audio_messages),
this, configuration.audio_messages,
configuration.incoming_data,
configuration.incoming_messages),
_rtcpSender(configuration.id, configuration.audio, configuration.clock,
this),
_rtcpReceiver(configuration.id, configuration.clock, this),
@ -103,8 +137,6 @@ ModuleRtpRtcpImpl::ModuleRtpRtcpImpl(const Configuration& configuration)
_defaultModule->RegisterChildModule(this);
}
// TODO(pwestin) move to constructors of each rtp/rtcp sender/receiver object.
_rtpReceiver.RegisterIncomingDataCallback(configuration.incoming_data);
_rtpReceiver.RegisterIncomingRTPCallback(configuration.incoming_messages);
_rtcpReceiver.RegisterRtcpObservers(configuration.intra_frame_callback,
configuration.bandwidth_callback,
configuration.rtcp_feedback);