First version of the peerconnection client application for Linux.

I made several updates to the Windows version as well so that both
implementations share
a big portion of the code.
The underlying PeerConnection notifications have changed a bit since the last
update
so that there's still a known issue that I plan to fix in my next change:

  // TODO(tommi): There's a problem now with terminating connections:
  // When ending a conversation, both peers now send a signaling message
  // that indicates that their ports are closed (port=0).  The trouble this
  // causes us here is that we can interpret such a message as an invite
  // to a new conversation.  So, currently there is a bug that ending
  // a conversation can immediately start a new one.
  // To fix this I plan to change how conversations start and have a special
  // notification message via the server that prepares a client for a
  // conversation instead of automatically recognizing the first signaling
  // message as an invite.
Review URL: http://webrtc-codereview.appspot.com/112008

git-svn-id: http://webrtc.googlecode.com/svn/trunk@446 4adac7df-926f-26a2-2b94-8c16560cd09d
This commit is contained in:
tommi@webrtc.org
2011-08-25 15:03:52 +00:00
parent 137ece4ac3
commit 102b2270c7
13 changed files with 827 additions and 384 deletions

View File

@@ -15,34 +15,18 @@
#include "talk/p2p/client/basicportallocator.h"
#include "talk/session/phone/videorendererfactory.h"
Conductor::Conductor(PeerConnectionClient* client, MainWnd* main_wnd)
: handshake_(NONE),
waiting_for_audio_(false),
Conductor::Conductor(PeerConnectionClient* client, MainWindow* main_wnd)
: waiting_for_audio_(false),
waiting_for_video_(false),
peer_id_(-1),
video_channel_(""),
audio_channel_(""),
client_(client),
main_wnd_(main_wnd) {
// Create a window for posting notifications back to from other threads.
bool ok = Create(HWND_MESSAGE, L"Conductor", 0, 0, 0, 0, 0, 0);
ASSERT(ok);
client_->RegisterObserver(this);
main_wnd->RegisterObserver(this);
}
Conductor::~Conductor() {
ASSERT(peer_connection_.get() == NULL);
Destroy();
DeletePeerConnection();
}
bool Conductor::has_video() const {
return !video_channel_.empty();
}
bool Conductor::has_audio() const {
return !audio_channel_.empty();
}
bool Conductor::connection_active() const {
@@ -50,40 +34,52 @@ bool Conductor::connection_active() const {
}
void Conductor::Close() {
if (peer_connection_.get()) {
peer_connection_->Close();
video_channel_ = "";
audio_channel_ = "";
} else {
client_->SignOut();
}
client_->SignOut();
DeletePeerConnection();
}
bool Conductor::InitializePeerConnection() {
ASSERT(peer_connection_factory_.get() == NULL);
ASSERT(peer_connection_.get() == NULL);
ASSERT(port_allocator_.get() == NULL);
ASSERT(worker_thread_.get() == NULL);
port_allocator_.reset(new cricket::BasicPortAllocator(
new talk_base::BasicNetworkManager(),
talk_base::SocketAddress("stun.l.google.com", 19302),
talk_base::SocketAddress(),
talk_base::SocketAddress(), talk_base::SocketAddress()));
worker_thread_.reset(new talk_base::Thread());
if (!worker_thread_->SetName("workder thread", this) ||
if (!worker_thread_->SetName("ConductorWT", this) ||
!worker_thread_->Start()) {
LOG(WARNING) << "Failed to start libjingle workder thread";
LOG(LS_ERROR) << "Failed to start libjingle worker thread";
worker_thread_.reset();
return false;
}
peer_connection_.reset(
webrtc::PeerConnection::Create(GetPeerConnectionString(),
port_allocator_.get(),
worker_thread_.get()));
peer_connection_->RegisterObserver(this);
if (!peer_connection_->Init()) {
cricket::PortAllocator* port_allocator =
new cricket::BasicPortAllocator(
new talk_base::BasicNetworkManager(),
talk_base::SocketAddress("stun.l.google.com", 19302),
talk_base::SocketAddress(),
talk_base::SocketAddress(),
talk_base::SocketAddress());
peer_connection_factory_.reset(
new webrtc::PeerConnectionFactory(GetPeerConnectionString(),
port_allocator,
worker_thread_.get()));
if (!peer_connection_factory_->Initialize()) {
main_wnd_->MessageBox("Error",
"Failed to initialize PeerConnectionFactory", true);
DeletePeerConnection();
return false;
}
// Since we only ever use a single PeerConnection instance, we share
// the worker thread between the factory and the PC instance.
peer_connection_.reset(peer_connection_factory_->CreatePeerConnection(
worker_thread_.get()));
if (!peer_connection_.get()) {
main_wnd_->MessageBox("Error",
"CreatePeerConnection failed", true);
DeletePeerConnection();
} else {
peer_connection_->RegisterObserver(this);
bool audio = peer_connection_->SetAudioDevice("", "", 0);
LOG(INFO) << "SetAudioDevice " << (audio ? "succeeded." : "failed.");
}
@@ -92,13 +88,20 @@ bool Conductor::InitializePeerConnection() {
void Conductor::DeletePeerConnection() {
peer_connection_.reset();
handshake_ = NONE;
worker_thread_.reset();
video_channel_.clear();
audio_channel_.clear();
peer_connection_factory_.reset();
waiting_for_audio_ = false;
waiting_for_video_ = false;
peer_id_ = -1;
}
void Conductor::StartCaptureDevice() {
ASSERT(peer_connection_.get());
ASSERT(peer_connection_.get() != NULL);
if (main_wnd_->IsWindow()) {
main_wnd_->SwitchToStreamingUI();
if (main_wnd_->current_ui() != MainWindow::STREAMING)
main_wnd_->SwitchToStreamingUI();
if (peer_connection_->SetVideoCapture("")) {
peer_connection_->SetLocalVideoRenderer(main_wnd_->local_renderer());
@@ -113,41 +116,24 @@ void Conductor::StartCaptureDevice() {
//
void Conductor::OnInitialized() {
PostMessage(handle(), PEER_CONNECTION_ADDSTREAMS, 0, 0);
main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_ADDSTREAMS, NULL);
}
void Conductor::OnError() {
LOG(INFO) << __FUNCTION__;
ASSERT(false);
LOG(LS_ERROR) << __FUNCTION__;
main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_ERROR, NULL);
}
void Conductor::OnSignalingMessage(const std::string& msg) {
LOG(INFO) << __FUNCTION__;
bool shutting_down = (video_channel_.empty() && audio_channel_.empty());
if (handshake_ == OFFER_RECEIVED && !shutting_down)
StartCaptureDevice();
// Send our answer/offer/shutting down message.
// If we're the initiator, this will be our offer. If we just received
// an offer, this will be an answer. If PeerConnection::Close has been
// called, then this is our signal to the other end that we're shutting
// down.
if (handshake_ != QUIT_SENT) {
SendMessage(handle(), SEND_MESSAGE_TO_PEER, 0,
reinterpret_cast<LPARAM>(&msg));
}
if (shutting_down) {
handshake_ = QUIT_SENT;
PostMessage(handle(), PEER_CONNECTION_CLOSED, 0, 0);
}
std::string* msg_copy = new std::string(msg);
main_wnd_->QueueUIThreadCallback(SEND_MESSAGE_TO_PEER, msg_copy);
}
// Called when a local stream is added and initialized
void Conductor::OnLocalStreamInitialized(const std::string& stream_id,
bool video) {
bool video) {
LOG(INFO) << __FUNCTION__ << " " << stream_id;
bool send_notification = (waiting_for_video_ || waiting_for_audio_);
if (video) {
@@ -165,11 +151,10 @@ void Conductor::OnLocalStreamInitialized(const std::string& stream_id,
}
if (send_notification && !waiting_for_audio_ && !waiting_for_video_)
PostMessage(handle(), MEDIA_CHANNELS_INITIALIZED, 0, 0);
main_wnd_->QueueUIThreadCallback(MEDIA_CHANNELS_INITIALIZED, NULL);
if (!waiting_for_audio_ && !waiting_for_video_) {
PostMessage(handle(), PEER_CONNECTION_CONNECT, 0, 0);
}
if (!waiting_for_audio_ && !waiting_for_video_)
main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_CONNECT, NULL);
}
// Called when a remote stream is added
@@ -177,35 +162,42 @@ void Conductor::OnAddStream(const std::string& stream_id, bool video) {
LOG(INFO) << __FUNCTION__ << " " << stream_id;
bool send_notification = (waiting_for_video_ || waiting_for_audio_);
if (video) {
ASSERT(video_channel_.empty());
// ASSERT(video_channel_.empty());
video_channel_ = stream_id;
waiting_for_video_ = false;
LOG(INFO) << "Setting video renderer for stream: " << stream_id;
bool ok = peer_connection_->SetVideoRenderer(stream_id,
main_wnd_->remote_renderer());
ASSERT(ok);
if (!ok)
LOG(LS_ERROR) << "SetVideoRenderer failed for : " << stream_id;
} else {
ASSERT(audio_channel_.empty());
// ASSERT(audio_channel_.empty());
audio_channel_ = stream_id;
waiting_for_audio_ = false;
}
if (send_notification && !waiting_for_audio_ && !waiting_for_video_)
PostMessage(handle(), MEDIA_CHANNELS_INITIALIZED, 0, 0);
main_wnd_->QueueUIThreadCallback(MEDIA_CHANNELS_INITIALIZED, NULL);
if (!waiting_for_audio_ && !waiting_for_video_) {
PostMessage(handle(), PEER_CONNECTION_CONNECT, 0, 0);
}
if (!waiting_for_audio_ && !waiting_for_video_)
main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_CONNECT, NULL);
}
void Conductor::OnRemoveStream(const std::string& stream_id, bool video) {
LOG(INFO) << __FUNCTION__;
LOG(INFO) << __FUNCTION__ << (video ? " video: " : " audio: ") << stream_id;
if (video) {
ASSERT(video_channel_.compare(stream_id) == 0);
video_channel_ = "";
video_channel_.clear();
} else {
ASSERT(audio_channel_.compare(stream_id) == 0);
audio_channel_ = "";
audio_channel_.clear();
}
if (video_channel_.empty() && audio_channel_.empty()) {
LOG(INFO) << "All streams have been closed.";
main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_CLOSED, NULL);
} else {
LOG(INFO) << "Remaining streams: '" << video_channel_ << "', '"
<< audio_channel_ << "'";
}
}
@@ -220,137 +212,144 @@ void Conductor::OnSignedIn() {
void Conductor::OnDisconnected() {
LOG(INFO) << __FUNCTION__;
if (peer_connection_.get()) {
peer_connection_->Close();
} else if (main_wnd_->IsWindow()) {
DeletePeerConnection();
if (main_wnd_->IsWindow())
main_wnd_->SwitchToConnectUI();
}
}
void Conductor::OnPeerConnected(int id, const std::string& name) {
LOG(INFO) << __FUNCTION__;
// Refresh the list if we're showing it.
if (main_wnd_->current_ui() == MainWnd::LIST_PEERS)
if (main_wnd_->current_ui() == MainWindow::LIST_PEERS)
main_wnd_->SwitchToPeerList(client_->peers());
}
void Conductor::OnPeerDisconnected(int id, const std::string& name) {
void Conductor::OnPeerDisconnected(int id) {
LOG(INFO) << __FUNCTION__;
if (id == peer_id_) {
LOG(INFO) << "Our peer disconnected";
peer_id_ = -1;
if (peer_connection_.get())
peer_connection_->Close();
main_wnd_->QueueUIThreadCallback(PEER_CONNECTION_CLOSED, NULL);
} else {
// Refresh the list if we're showing it.
if (main_wnd_->current_ui() == MainWindow::LIST_PEERS)
main_wnd_->SwitchToPeerList(client_->peers());
}
// Refresh the list if we're showing it.
if (main_wnd_->current_ui() == MainWnd::LIST_PEERS)
main_wnd_->SwitchToPeerList(client_->peers());
}
void Conductor::OnMessageFromPeer(int peer_id, const std::string& message) {
ASSERT(peer_id_ == peer_id || peer_id_ == -1);
ASSERT(!message.empty());
if (handshake_ == NONE) {
handshake_ = OFFER_RECEIVED;
if (!peer_connection_.get()) {
ASSERT(peer_id_ == -1);
peer_id_ = peer_id;
if (!peer_connection_.get()) {
// Got an offer. Give it to the PeerConnection instance.
// Once processed, we will get a callback to OnSignalingMessage with
// our 'answer' which we'll send to the peer.
LOG(INFO) << "Got an offer from our peer: " << peer_id;
if (!InitializePeerConnection()) {
LOG(LS_ERROR) << "Failed to initialize our PeerConnection instance";
client_->SignOut();
return;
}
// Got an offer. Give it to the PeerConnection instance.
// Once processed, we will get a callback to OnSignalingMessage with
// our 'answer' which we'll send to the peer.
LOG(INFO) << "Got an offer from our peer: " << peer_id;
if (!InitializePeerConnection()) {
LOG(LS_ERROR) << "Failed to initialize our PeerConnection instance";
client_->SignOut();
return;
} else {
StartCaptureDevice();
}
} else if (handshake_ == INITIATOR) {
LOG(INFO) << "Remote peer sent us an answer";
handshake_ = ANSWER_RECEIVED;
} else if (peer_id != peer_id_) {
ASSERT(peer_id_ != -1);
LOG(WARNING) << "Received an offer from a peer while already in a "
"conversation with a different peer.";
return;
}
peer_connection_->SignalingMessage(message);
}
if (handshake_ == QUIT_SENT) {
DisconnectFromCurrentPeer();
}
void Conductor::OnMessageSent(int err) {
// Process the next pending message if any.
main_wnd_->QueueUIThreadCallback(SEND_MESSAGE_TO_PEER, NULL);
}
//
// MainWndCallback implementation.
//
void Conductor::StartLogin(const std::string& server, int port) {
ASSERT(!client_->is_connected());
bool Conductor::StartLogin(const std::string& server, int port) {
if (client_->is_connected())
return false;
if (!client_->Connect(server, port, GetPeerName())) {
MessageBoxA(main_wnd_->handle(),
("Failed to connect to " + server).c_str(),
"Error", MB_OK | MB_ICONERROR);
main_wnd_->MessageBox("Error", ("Failed to connect to " + server).c_str(),
true);
return false;
}
return true;
}
void Conductor::DisconnectFromServer() {
if (!client_->is_connected())
return;
client_->SignOut();
if (client_->is_connected())
client_->SignOut();
}
void Conductor::ConnectToPeer(int peer_id) {
ASSERT(peer_id_ == -1);
ASSERT(peer_id != -1);
ASSERT(handshake_ == NONE);
if (handshake_ != NONE)
if (peer_connection_.get()) {
main_wnd_->MessageBox("Error",
"We only support connecting to one peer at a time", true);
return;
}
if (InitializePeerConnection()) {
peer_id_ = peer_id;
main_wnd_->SwitchToStreamingUI();
OnInitialized(); // TODO(tommi): Figure out why we don't get this callback.
} else {
::MessageBoxA(main_wnd_->handle(), "Failed to initialize PeerConnection",
"Error", MB_OK | MB_ICONERROR);
main_wnd_->MessageBox("Error", "Failed to initialize PeerConnection", true);
}
}
void Conductor::AddStreams() {
waiting_for_video_ = peer_connection_->AddStream(kVideoLabel, true);
waiting_for_audio_ = peer_connection_->AddStream(kAudioLabel, false);
if (waiting_for_video_ || waiting_for_audio_)
handshake_ = INITIATOR;
ASSERT(waiting_for_video_ || waiting_for_audio_);
}
ASSERT(!waiting_for_video_);
ASSERT(!waiting_for_audio_);
void Conductor::PeerConnectionConnect() {
peer_connection_->Connect();
waiting_for_video_ = true;
waiting_for_audio_ = true;
if (!peer_connection_->AddStream(kVideoLabel, true))
waiting_for_video_ = false;
if (!peer_connection_->AddStream(kAudioLabel, false))
waiting_for_audio_ = false;
}
void Conductor::DisconnectFromCurrentPeer() {
LOG(INFO) << __FUNCTION__;
if (peer_connection_.get())
peer_connection_->Close();
DeletePeerConnection();
if (main_wnd_->IsWindow())
main_wnd_->SwitchToPeerList(client_->peers());
}
//
// Win32Window implementation.
//
bool Conductor::OnMessage(UINT msg, WPARAM wp, LPARAM lp,
LRESULT& result) { // NOLINT
bool ret = true;
if (msg == MEDIA_CHANNELS_INITIALIZED) {
ASSERT(handshake_ == INITIATOR);
bool ok = peer_connection_->Connect();
ASSERT(ok);
StartCaptureDevice();
// When we get an OnSignalingMessage notification, we'll send our
// json encoded signaling message to the peer, which is the first step
// of establishing a connection.
} else if (msg == PEER_CONNECTION_CLOSED) {
void Conductor::UIThreadCallback(int msg_id, void* data) {
if (msg_id == MEDIA_CHANNELS_INITIALIZED) {
bool ok = peer_connection_->Connect();
ASSERT(ok);
StartCaptureDevice();
// When we get an OnSignalingMessage notification, we'll send our
// json encoded signaling message to the peer, which is the first step
// of establishing a connection.
} else if (msg_id == PEER_CONNECTION_CLOSED) {
LOG(INFO) << "PEER_CONNECTION_CLOSED";
DeletePeerConnection();
::InvalidateRect(main_wnd_->handle(), NULL, TRUE);
waiting_for_audio_ = false;
waiting_for_video_ = false;
peer_id_ = -1;
ASSERT(video_channel_.empty());
ASSERT(audio_channel_.empty());
if (main_wnd_->IsWindow()) {
@@ -362,20 +361,34 @@ bool Conductor::OnMessage(UINT msg, WPARAM wp, LPARAM lp,
} else {
DisconnectFromServer();
}
} else if (msg == SEND_MESSAGE_TO_PEER) {
bool ok = client_->SendToPeer(peer_id_,
*reinterpret_cast<std::string*>(lp));
if (!ok) {
LOG(LS_ERROR) << "SendToPeer failed";
DisconnectFromServer();
}
} else if (msg == PEER_CONNECTION_ADDSTREAMS) {
AddStreams();
} else if (msg == PEER_CONNECTION_CONNECT) {
PeerConnectionConnect();
} else {
ret = false;
}
} else if (msg_id == SEND_MESSAGE_TO_PEER) {
LOG(INFO) << "SEND_MESSAGE_TO_PEER";
std::string* msg = reinterpret_cast<std::string*>(data);
if (client_->IsSendingMessage()) {
ASSERT(msg != NULL);
pending_messages_.push_back(msg);
} else {
if (!msg && !pending_messages_.empty()) {
msg = pending_messages_.front();
pending_messages_.pop_front();
}
if (msg) {
bool ok = client_->SendToPeer(peer_id_, *msg);
if (!ok && peer_id_ != -1) {
LOG(LS_ERROR) << "SendToPeer failed";
DisconnectFromServer();
}
delete msg;
}
return ret;
if (!peer_connection_.get())
peer_id_ = -1;
}
} else if (msg_id == PEER_CONNECTION_ADDSTREAMS) {
AddStreams();
} else if (msg_id == PEER_CONNECTION_CONNECT) {
peer_connection_->Connect();
} else if (msg_id == PEER_CONNECTION_ERROR) {
main_wnd_->MessageBox("Error", "an unknown error occurred", true);
}
}