added OpenPGM sender - ZMQ_PUB

This commit is contained in:
malosek
2009-09-11 17:58:37 +02:00
parent e940878b3f
commit 1a4d6f9119
14 changed files with 1361 additions and 21 deletions

View File

@@ -17,6 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <iostream>
#include <string>
#include <algorithm>
@@ -35,9 +37,12 @@
#include "uuid.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_) :
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_),
type (type_),
current (0),
active (0),
pending_term_acks (0),
@@ -145,6 +150,22 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
errno = EFAULT;
return -1;
case ZMQ_RATE:
if (optvallen_ != sizeof (uint32_t)) {
errno = EINVAL;
return -1;
}
options.rate = *((int32_t*) optval_);
return 0;
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (uint32_t)) {
errno = EINVAL;
return -1;
}
options.recovery_ivl = *((int32_t*) optval_);
return 0;
default:
errno = EINVAL;
return -1;
@@ -170,6 +191,21 @@ int zmq::socket_base_t::connect (const char *addr_)
std::string session_name ("#");
session_name += uuid_t ().to_string ();
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
std::string addr (addr_);
std::string::size_type pos = addr.find ("://");
if (pos == std::string::npos) {
errno = EINVAL;
return -1;
}
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
// Create the session.
io_thread_t *io_thread = choose_io_thread (options.affinity);
session_t *session = new session_t (io_thread, this, session_name.c_str (),
@@ -198,20 +234,63 @@ int zmq::socket_base_t::connect (const char *addr_)
send_plug (session);
send_own (this, session);
// Create the connecter object. Supply it with the session name so that
// it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
session_name.c_str ());
int rc = connecter->set_address (addr_);
if (rc != 0) {
delete connecter;
return -1;
}
send_plug (connecter);
send_own (this, connecter);
if (addr_type == "tcp") {
return 0;
// Create the connecter object. Supply it with the session name so that
// it can bind the new connection to the session once it is established.
zmq_connecter_t *connecter = new zmq_connecter_t (
choose_io_thread (options.affinity), this, options,
session_name.c_str ());
int rc = connecter->set_address (addr_args.c_str ());
if (rc != 0) {
delete connecter;
return -1;
}
send_plug (connecter);
send_own (this, connecter);
return 0;
}
#if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm") {
switch (type) {
case ZMQ_PUB:
{
pgm_sender_t *pgm_sender =
new pgm_sender_t (choose_io_thread (options.affinity), options,
session_name.c_str ());
int rc = pgm_sender->init (addr_args.c_str ());
if (rc != 0) {
delete pgm_sender;
return -1;
}
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_sender);
pgm_sender = NULL;
break;
}
case ZMQ_SUB:
zmq_assert (false);
break;
default:
errno = EINVAL;
return -1;
}
return 0;
}
#endif
// Unknown address type.
errno = ENOTSUP;
return -1;
}
int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)