From 33f22d0d04b8f26f3bf89a42ca093eab766b888e Mon Sep 17 00:00:00 2001 From: bebopagogo Date: Sat, 15 Mar 2014 10:48:12 -0400 Subject: [PATCH] added norm_engine --- configure.ac | 22 ++++++++++++++++++++++ src/Makefile.am | 2 ++ src/session_base.cpp | 38 ++++++++++++++++++++++++++++++++++++-- src/socket_base.cpp | 24 +++++++++++++++++------- 4 files changed, 77 insertions(+), 9 deletions(-) diff --git a/configure.ac b/configure.ac index 963a6ead..96463fc1 100644 --- a/configure.ac +++ b/configure.ac @@ -463,6 +463,28 @@ fi AC_SUBST(pgm_basename) + +# This uses "--with-norm" to point to the "norm" directory +# for "norm/include" and "norm/lib" +#(if "--with-norm=yes" is given, then assume installed on system) +AC_ARG_WITH([norm], [AS_HELP_STRING([--with-norm], + [build libzmq with NORM protocol extension, optionally specifying norm path [default=no]])], + [with_norm_ext=$withval], [with_norm_ext=no]) + + +AC_MSG_CHECKING("with_norm_ext = ${with_norm_ext}") + +if test "x$with_norm_ext" != "xno"; then + AC_DEFINE(ZMQ_HAVE_NORM, 1, [Have NORM protocol extension]) + if test "x$wwith_norm_ext" != "xyes"; then + norm_path="${with_norm_ext}" + LIBZMQ_EXTRA_CXXFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_CXXFLAGS}" + LIBZMQ_EXTRA_LDFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_LDFLAGS}" + fi + LIBS="-lnorm $LIBS" +fi + + # Set -Wall, -Werror and -pedantic AC_LANG_PUSH([C++]) diff --git a/src/Makefile.am b/src/Makefile.am index 94b53a85..53390b20 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -45,6 +45,7 @@ libzmq_la_SOURCES = \ msg.hpp \ mtrie.hpp \ mutex.hpp \ + norm_engine.hpp \ null_mechanism.hpp \ object.hpp \ options.hpp \ @@ -112,6 +113,7 @@ libzmq_la_SOURCES = \ mechanism.cpp \ msg.cpp \ mtrie.cpp \ + norm_engine.cpp \ null_mechanism.cpp \ object.cpp \ options.cpp \ diff --git a/src/session_base.cpp b/src/session_base.cpp index 442e05de..7c7650ae 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -28,6 +28,7 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "address.hpp" +#include "norm_engine.hpp" #include "ctx.hpp" #include "req.hpp" @@ -449,8 +450,9 @@ void zmq::session_base_t::reconnect () { // For delayed connect situations, terminate the pipe // and reestablish later on - if (pipe && options.immediate == 1 - && addr->protocol != "pgm" && addr->protocol != "epgm") { + if (pipe && 1 == options.immediate == 1 + && addr->protocol != "pgm" && addr->protocol != "epgm" + && addr->protocol != "norm") { pipe->hiccup (); pipe->terminate (false); terminating_pipes.insert (pipe); @@ -549,6 +551,38 @@ void zmq::session_base_t::start_connecting (bool wait_) return; } #endif + +#ifdef ZMQ_HAVE_NORM + if (addr->protocol == "norm") + { + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with NORM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + + // NORM sender. + norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options); + alloc_assert (norm_sender); + + int rc = norm_sender->init (addr->address.c_str (), true, false); + errno_assert (rc == 0); + + send_attach (this, norm_sender); + } + else { // ZMQ_SUB or ZMQ_XSUB + + // NORM receiver. + norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options); + alloc_assert (norm_receiver); + + int rc = norm_receiver->init (addr->address.c_str (), false, true); + errno_assert (rc == 0); + + send_attach (this, norm_receiver); + } + return; + } +#endif // ZMQ_HAVE_NORM zmq_assert (false); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 630e7981..d467ceb8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -190,11 +190,11 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc") { + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc" && + protocol_ != "norm") { errno = EPROTONOSUPPORT; return -1; } - // If 0MQ is not compiled with OpenPGM, pgm and epgm transports // are not avaialble. #if !defined ZMQ_HAVE_OPENPGM @@ -203,6 +203,13 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return -1; } #endif + +#if !defined ZMQ_HAVE_NORM + if (protocol_ == "norm") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif // !ZMQ_HAVE_NORM // IPC transport is not available on Windows and OpenVMS. #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS @@ -224,7 +231,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) // Check whether socket type and transport protocol match. // Specifically, multicast protocols can't be combined with // bi-directional messaging patterns (socket types). - if ((protocol_ == "pgm" || protocol_ == "epgm") && + if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") && options.type != ZMQ_PUB && options.type != ZMQ_SUB && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) { errno = ENOCOMPATPROTO; @@ -362,9 +369,9 @@ int zmq::socket_base_t::bind (const char *addr_) return rc; } - if (protocol == "pgm" || protocol == "epgm") { + if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { // For convenience's sake, bind can be used interchageable with - // connect for PGM and EPGM transports. + // connect for PGM, EPGM and NORM transports. return connect (addr_); } @@ -600,6 +607,9 @@ int zmq::socket_base_t::connect (const char *addr_) } } #endif + +// TBD - Should we check address for ZMQ_HAVE_NORM??? + #ifdef ZMQ_HAVE_OPENPGM if (protocol == "pgm" || protocol == "epgm") { struct pgm_addrinfo_t *res = NULL; @@ -630,8 +640,8 @@ int zmq::socket_base_t::connect (const char *addr_) errno_assert (session); // PGM does not support subscription forwarding; ask for all data to be - // sent to this pipe. - bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"; + // sent to this pipe. (same for NORM, currently?) + bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm"; pipe_t *newpipe = NULL; if (options.immediate != 1 || subscribe_to_all) {