diff --git a/include/zmq.h b/include/zmq.h index 32bf8cce..87145d7d 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -397,7 +397,7 @@ ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p); typedef struct zmq_pollitem_t { - void *socket; + void *socket; #if defined _WIN32 SOCKET fd; #else @@ -426,7 +426,7 @@ ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p); #else ZMQ_EXPORT int zmq_pollfd_fd (void *p); #endif - + /******************************************************************************/ /* Message proxying */ /******************************************************************************/ @@ -517,8 +517,8 @@ ZMQ_EXPORT void zmq_threadclose (void* thread); #define ZMQ_UNUSED(object) (void)object #define LIBZMQ_DELETE(p_object) {\ - delete p_object; \ - p_object = 0; \ + delete p_object; \ + p_object = 0; \ } #undef ZMQ_EXPORT diff --git a/src/blob.hpp b/src/blob.hpp index 689b60b2..cbc7a619 100644 --- a/src/blob.hpp +++ b/src/blob.hpp @@ -45,10 +45,10 @@ namespace std { typedef unsigned char char_type; // Unsigned as wint_t in unsigned. - typedef unsigned long int_type; - typedef streampos pos_type; - typedef streamoff off_type; - typedef mbstate_t state_type; + typedef unsigned long int_type; + typedef streampos pos_type; + typedef streamoff off_type; + typedef mbstate_t state_type; static void assign(char_type& __c1, const char_type& __c2) diff --git a/src/clock.cpp b/src/clock.cpp index bb307401..930a9369 100644 --- a/src/clock.cpp +++ b/src/clock.cpp @@ -149,16 +149,16 @@ uint64_t zmq::clock_t::now_us () // Use POSIX clock_gettime function to get precise monotonic time. struct timespec tv; int rc = clock_gettime (CLOCK_MONOTONIC, &tv); - // Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported. - // This should be a configuration check, but I looked into it and writing an - // AC_FUNC_CLOCK_MONOTONIC seems beyond my powers. - if( rc != 0) { - // Use POSIX gettimeofday function to get precise time. - struct timeval tv; - int rc = gettimeofday (&tv, NULL); - errno_assert (rc == 0); - return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); - } + // Fix case where system has clock_gettime but CLOCK_MONOTONIC is not supported. + // This should be a configuration check, but I looked into it and writing an + // AC_FUNC_CLOCK_MONOTONIC seems beyond my powers. + if( rc != 0) { + // Use POSIX gettimeofday function to get precise time. + struct timeval tv; + int rc = gettimeofday (&tv, NULL); + errno_assert (rc == 0); + return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_usec); + } return (tv.tv_sec * (uint64_t) 1000000 + tv.tv_nsec / 1000); #elif defined HAVE_GETHRTIME diff --git a/src/condition_variable.hpp b/src/condition_variable.hpp index 6c588bfb..4e20b401 100644 --- a/src/condition_variable.hpp +++ b/src/condition_variable.hpp @@ -47,36 +47,36 @@ namespace zmq { - class condition_variable_t - { - public: - inline condition_variable_t () - { - zmq_assert(false); - } + class condition_variable_t + { + public: + inline condition_variable_t () + { + zmq_assert(false); + } - inline ~condition_variable_t () - { + inline ~condition_variable_t () + { - } + } - inline int wait (mutex_t* mutex_, int timeout_ ) - { - zmq_assert(false); - return -1; - } + inline int wait (mutex_t* mutex_, int timeout_ ) + { + zmq_assert(false); + return -1; + } - inline void broadcast () - { - zmq_assert(false); - } + inline void broadcast () + { + zmq_assert(false); + } - private: + private: - // Disable copy construction and assignment. - condition_variable_t (const condition_variable_t&); - void operator = (const condition_variable_t&); - }; + // Disable copy construction and assignment. + condition_variable_t (const condition_variable_t&); + void operator = (const condition_variable_t&); + }; } @@ -95,7 +95,7 @@ namespace zmq inline ~condition_variable_t () { - + } inline int wait (mutex_t* mutex_, int timeout_ ) @@ -110,7 +110,7 @@ namespace zmq if (rc != ERROR_TIMEOUT) win_assert(rc); - errno = EAGAIN; + errno = EAGAIN; return -1; } @@ -161,9 +161,9 @@ namespace zmq if (timeout_ != -1) { struct timespec timeout; clock_gettime(CLOCK_REALTIME, &timeout); - + timeout.tv_sec += timeout_ / 1000; - timeout.tv_nsec += (timeout_ % 1000) * 1000000; + timeout.tv_nsec += (timeout_ % 1000) * 1000000; rc = pthread_cond_timedwait (&cond, mutex_->get_mutex (), &timeout); } else diff --git a/src/ctx.hpp b/src/ctx.hpp index 4dfd062a..970207c6 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -86,7 +86,7 @@ namespace zmq // (except zmq_close). // This function is non-blocking. // terminate must still be called afterwards. - // This function is optional, terminate will unblock any current + // This function is optional, terminate will unblock any current // operations as well. int shutdown(); @@ -98,7 +98,7 @@ namespace zmq zmq::socket_base_t *create_socket (int type_); void destroy_socket (zmq::socket_base_t *socket_); - // Start a new thread with proper scheduling parameters. + // Start a new thread with proper scheduling parameters. void start_thread (thread_t &thread_, thread_fn *tfn_, void *arg_) const; // Send command to the destination thread. @@ -203,7 +203,7 @@ namespace zmq // Is IPv6 enabled on this context? bool ipv6; - // Thread scheduling parameters. + // Thread scheduling parameters. int thread_priority; int thread_sched_policy; diff --git a/src/gssapi_mechanism_base.cpp b/src/gssapi_mechanism_base.cpp index bdd18361..aad0e5e9 100644 --- a/src/gssapi_mechanism_base.cpp +++ b/src/gssapi_mechanism_base.cpp @@ -178,7 +178,7 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_) const uint8_t flags = static_cast (plaintext.value)[0]; if (flags & 0x01) - msg_->set_flags (msg_t::more); + msg_->set_flags (msg_t::more); if (flags & 0x02) msg_->set_flags (msg_t::command); diff --git a/src/gssapi_server.cpp b/src/gssapi_server.cpp index 97f3f9b5..85922337 100644 --- a/src/gssapi_server.cpp +++ b/src/gssapi_server.cpp @@ -123,7 +123,7 @@ int zmq::gssapi_server_t::process_handshake_command (msg_t *msg_) } if (security_context_established) { - // Use ZAP protocol (RFC 27) to authenticate the user. + // Use ZAP protocol (RFC 27) to authenticate the user. bool expecting_zap_reply = false; int rc = session->zap_connect (); if (rc == 0) { diff --git a/src/gssapi_server.hpp b/src/gssapi_server.hpp index c3782230..62d1196a 100644 --- a/src/gssapi_server.hpp +++ b/src/gssapi_server.hpp @@ -85,8 +85,8 @@ namespace zmq void accept_context (); int produce_next_token (msg_t *msg_); int process_next_token (msg_t *msg_); - void send_zap_request (); - int receive_and_process_zap_reply(); + void send_zap_request (); + int receive_and_process_zap_reply(); }; } diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 545dc21d..e3093df6 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -96,7 +96,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, count = 1; next.node = NULL; } - else + else if (count == 1) { unsigned char oldc = min; mtrie_t *oldp = next.node; @@ -109,7 +109,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, min = std::min (min, c); next.table [oldc - min] = oldp; } - else + else if (min < c) { // The new character is above the current character range. unsigned short old_count = count; @@ -252,7 +252,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, count = 0; } // Compact the node table if possible - else + else if (live_nodes == 1) { // If there's only one live node in the table we can // switch to using the more compact single-node @@ -412,16 +412,16 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, break; // If there's one subnode (optimisation). - if (current->count == 1) { + if (current->count == 1) { if (data_ [0] != current->min) break; current = current->next.node; data_++; size_--; - continue; - } + continue; + } - // If there are multiple subnodes. + // If there are multiple subnodes. if (data_ [0] < current->min || data_ [0] >= current->min + current->count) break; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 2557c668..7e02724b 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -82,7 +82,7 @@ int zmq::pgm_socket_t::init_address (const char *network_, } *port_number = atoi (port_delim + 1); - + char network [256]; if (port_delim - network_ >= (int) sizeof (network) - 1) { errno = EINVAL; @@ -195,24 +195,24 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) } { - const int rcvbuf = (int) options.rcvbuf; - if (rcvbuf >= 0) { - if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, - sizeof (rcvbuf))) - goto err_abort; - } + const int rcvbuf = (int) options.rcvbuf; + if (rcvbuf >= 0) { + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, + sizeof (rcvbuf))) + goto err_abort; + } - const int sndbuf = (int) options.sndbuf; - if (sndbuf >= 0) { - if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, - sizeof (sndbuf))) - goto err_abort; - } + const int sndbuf = (int) options.sndbuf; + if (sndbuf >= 0) { + if (!pgm_setsockopt (sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, + sizeof (sndbuf))) + goto err_abort; + } - const int max_tpdu = (int) pgm_max_tpdu; - if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, - sizeof (max_tpdu))) - goto err_abort; + const int max_tpdu = (int) pgm_max_tpdu; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MTU, &max_tpdu, + sizeof (max_tpdu))) + goto err_abort; } if (receiver) { @@ -334,28 +334,28 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) // Set IP level parameters. { - // Multicast loopback disabled by default - const int multicast_loop = 0; - if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, - &multicast_loop, sizeof (multicast_loop))) - goto err_abort; + // Multicast loopback disabled by default + const int multicast_loop = 0; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_LOOP, + &multicast_loop, sizeof (multicast_loop))) + goto err_abort; - const int multicast_hops = options.multicast_hops; - if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, - &multicast_hops, sizeof (multicast_hops))) - goto err_abort; + const int multicast_hops = options.multicast_hops; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_MULTICAST_HOPS, + &multicast_hops, sizeof (multicast_hops))) + goto err_abort; - // Expedited Forwarding PHB for network elements, no ECN. - // Ignore return value due to varied runtime support. - const int dscp = 0x2e << 2; - if (AF_INET6 != sa_family) - pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, - &dscp, sizeof (dscp)); + // Expedited Forwarding PHB for network elements, no ECN. + // Ignore return value due to varied runtime support. + const int dscp = 0x2e << 2; + if (AF_INET6 != sa_family) + pgm_setsockopt (sock, IPPROTO_PGM, PGM_TOS, + &dscp, sizeof (dscp)); - const int nonblocking = 1; - if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, - &nonblocking, sizeof (nonblocking))) - goto err_abort; + const int nonblocking = 1; + if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_NOBLOCK, + &nonblocking, sizeof (nonblocking))) + goto err_abort; } // Connect PGM transport to start state machine. @@ -402,13 +402,13 @@ zmq::pgm_socket_t::~pgm_socket_t () { if (pgm_msgv) free (pgm_msgv); - if (sock) + if (sock) pgm_close (sock, TRUE); } // Get receiver fds. receive_fd_ is signaled for incoming packets, // waiting_pipe_fd_ is signaled for state driven events and data. -void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, +void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, fd_t *waiting_pipe_fd_) { socklen_t socklen; @@ -430,12 +430,12 @@ void zmq::pgm_socket_t::get_receiver_fds (fd_t *receive_fd_, zmq_assert (socklen == sizeof (*waiting_pipe_fd_)); } -// Get fds and store them into user allocated memory. +// Get fds and store them into user allocated memory. // send_fd is for non-blocking send wire notifications. // receive_fd_ is for incoming back-channel protocol packets. // rdata_notify_fd_ is raised for waiting repair transmissions. // pending_notify_fd_ is for state driven events. -void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, +void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, fd_t *rdata_notify_fd_, fd_t *pending_notify_fd_) { socklen_t socklen; @@ -475,7 +475,7 @@ void zmq::pgm_socket_t::get_sender_fds (fd_t *send_fd_, fd_t *receive_fd_, size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) { size_t nbytes = 0; - + const int status = pgm_send (sock, data_, data_len_, &nbytes); // We have to write all data as one packet. @@ -551,7 +551,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) { size_t raw_data_len = 0; - // We just sent all data from pgm_transport_recvmsgv up + // We just sent all data from pgm_transport_recvmsgv up // and have to return 0 that another engine in this thread is scheduled. if (nbytes_rec == nbytes_processed && nbytes_rec > 0) { @@ -572,7 +572,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_processed == 0); zmq_assert (nbytes_rec == 0); - // Receive a vector of Application Protocol Domain Unit's (APDUs) + // Receive a vector of Application Protocol Domain Unit's (APDUs) // from the transport. pgm_error_t *pgm_error = NULL; @@ -590,7 +590,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) zmq_assert (nbytes_rec == 0); - // In case if no RDATA/ODATA caused POLLIN 0 is + // In case if no RDATA/ODATA caused POLLIN 0 is // returned. nbytes_rec = 0; errno = EBUSY; @@ -646,8 +646,8 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_, const pgm_tsi_t **tsi_) // Only one APDU per pgm_msgv_t structure is allowed. zmq_assert (pgm_msgv [pgm_msgv_processed].msgv_len == 1); - - struct pgm_sk_buff_t* skb = + + struct pgm_sk_buff_t* skb = pgm_msgv [pgm_msgv_processed].msgv_skb [0]; // Take pointers from pgm_msgv_t structure. @@ -679,7 +679,7 @@ void zmq::pgm_socket_t::process_upstream () zmq_assert (status != PGM_IO_STATUS_ERROR); // No data should be returned. - zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || + zmq_assert (dummy_bytes == 0 && (status == PGM_IO_STATUS_TIMER_PENDING || status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK)); @@ -698,7 +698,7 @@ int zmq::pgm_socket_t::compute_sqns (int tpdu_) { // Convert rate into B/ms. uint64_t rate = uint64_t (options.rate) / 8; - + // Compute the size of the buffer in bytes. uint64_t size = uint64_t (options.recovery_ivl) * rate; diff --git a/src/pipe.cpp b/src/pipe.cpp index 6c335825..182011b9 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -515,13 +515,13 @@ void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite if (inhwm_ <= 0 || inhwmboost <= 0) - in = 0; - - if (outhwm_ <= 0 || outhwmboost <= 0) - out = 0; + in = 0; - lwm = compute_lwm(in); - hwm = out; + if (outhwm_ <= 0 || outhwmboost <= 0) + out = 0; + + lwm = compute_lwm(in); + hwm = out; } void zmq::pipe_t::set_hwms_boost(int inhwmboost_, int outhwmboost_) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e63af9ba..e29a6d2a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -299,7 +299,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool subscribe_to_all_) // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); pipes.push_back (pipe_); - + // Let the derived socket type know about new pipe. xattach_pipe (pipe_, subscribe_to_all_); @@ -316,12 +316,11 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, { ENTER_MUTEX(); - if (!options.is_valid(option_)) { - errno = EINVAL; - EXIT_MUTEX(); - return -1; - } - + if (!options.is_valid(option_)) { + errno = EINVAL; + EXIT_MUTEX(); + return -1; + } if (unlikely (ctx_terminated)) { errno = ETERM; @@ -339,7 +338,7 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_, // If the socket type doesn't support the option, pass it to // the generic option parser. rc = options.setsockopt (option_, optval_, optvallen_); - update_pipe_options(option_); + update_pipe_options(option_); EXIT_MUTEX(); return rc; @@ -382,10 +381,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, EXIT_MUTEX(); return -1; } - + *((fd_t*)optval_) = ((mailbox_t*)mailbox)->get_fd(); - *optvallen_ = sizeof(fd_t); - + *optvallen_ = sizeof(fd_t); + EXIT_MUTEX(); return 0; } @@ -809,9 +808,9 @@ int zmq::socket_base_t::connect (const char *addr_) } } #endif - + // TBD - Should we check address for ZMQ_HAVE_NORM??? - + #ifdef ZMQ_HAVE_OPENPGM if (protocol == "pgm" || protocol == "epgm") { struct pgm_addrinfo_t *res = NULL; @@ -1027,7 +1026,7 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_) if (unlikely (process_commands (timeout, false) != 0)) { EXIT_MUTEX(); return -1; - } + } rc = xsend (msg_); if (rc == 0) break; @@ -1167,7 +1166,7 @@ int zmq::socket_base_t::close () { // Mark the socket as dead tag = 0xdeadbeef; - + // Transfer the ownership of the socket from this application thread // to the reaper thread which will take care of the rest of shutdown // process. @@ -1195,13 +1194,13 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) if (!thread_safe) fd = ((mailbox_t*)mailbox)->get_fd(); - else { + else { ENTER_MUTEX(); reaper_signaler = new signaler_t(); // Add signaler to the safe mailbox - fd = reaper_signaler->get_fd(); + fd = reaper_signaler->get_fd(); ((mailbox_safe_t*)mailbox)->add_signaler(reaper_signaler); // Send a signal to make sure reaper handle existing commands @@ -1308,13 +1307,13 @@ void zmq::socket_base_t::process_term (int linger_) void zmq::socket_base_t::update_pipe_options(int option_) { - if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) - { - for (pipes_t::size_type i = 0; i != pipes.size(); ++i) - { - pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); - } - } + if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) + { + for (pipes_t::size_type i = 0; i != pipes.size(); ++i) + { + pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); + } + } } @@ -1378,11 +1377,11 @@ void zmq::socket_base_t::in_event () // be destroyed. ENTER_MUTEX(); - + // If the socket is thread safe we need to unsignal the reaper signaler if (thread_safe) reaper_signaler->recv(); - + process_commands (0, false); EXIT_MUTEX(); check_destroy (); diff --git a/src/thread.cpp b/src/thread.cpp index 666489c9..d013737d 100644 --- a/src/thread.cpp +++ b/src/thread.cpp @@ -36,7 +36,7 @@ extern "C" { #if defined _WIN32_WCE - static DWORD thread_routine (LPVOID arg_) + static DWORD thread_routine (LPVOID arg_) #else static unsigned int __stdcall thread_routine (void *arg_) #endif @@ -58,7 +58,7 @@ void zmq::thread_t::start (thread_fn *tfn_, void *arg_) descriptor = (HANDLE) _beginthreadex (NULL, 0, &::thread_routine, this, 0 , NULL); #endif - win_assert (descriptor != NULL); + win_assert (descriptor != NULL); } void zmq::thread_t::stop () @@ -92,7 +92,7 @@ extern "C" posix_assert (rc); #endif - zmq::thread_t *self = (zmq::thread_t*) arg_; + zmq::thread_t *self = (zmq::thread_t*) arg_; self->tfn (self->arg); return NULL; } diff --git a/src/xpub.cpp b/src/xpub.cpp index b98a7ca2..af24f062 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -40,39 +40,39 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : verbose_unsubs (false), more (false), lossy (true), - manual(false), - welcome_msg () + manual(false), + welcome_msg () { - last_pipe = NULL; - options.type = ZMQ_XPUB; - welcome_msg.init(); + last_pipe = NULL; + options.type = ZMQ_XPUB; + welcome_msg.init(); } zmq::xpub_t::~xpub_t () { - welcome_msg.close(); + welcome_msg.close(); } void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) { zmq_assert (pipe_); dist.attach (pipe_); - + // If subscribe_to_all_ is specified, the caller would like to subscribe // to all data on this pipe, implicitly. if (subscribe_to_all_) - subscriptions.add (NULL, 0, pipe_); + subscriptions.add (NULL, 0, pipe_); - // if welcome message exist - if (welcome_msg.size() > 0) - { - msg_t copy; - copy.init(); - copy.copy(welcome_msg); + // if welcome message exist + if (welcome_msg.size() > 0) + { + msg_t copy; + copy.init(); + copy.copy(welcome_msg); - pipe_->write(©); - pipe_->flush(); - } + pipe_->write(©); + pipe_->flush(); + } // The pipe is active when attached. Let's read the subscriptions from // it, if any. @@ -87,32 +87,32 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // Apply the subscription to the trie unsigned char *const data = (unsigned char *) sub.data (); const size_t size = sub.size (); - if (size > 0 && (*data == 0 || *data == 1)) { - if (manual) - { - last_pipe = pipe_; - pending_data.push_back(blob_t(data, size)); + if (size > 0 && (*data == 0 || *data == 1)) { + if (manual) + { + last_pipe = pipe_; + pending_data.push_back(blob_t(data, size)); pending_metadata.push_back(sub.metadata()); - pending_flags.push_back(0); - } - else - { - bool unique; - if (*data == 0) - unique = subscriptions.rm(data + 1, size - 1, pipe_); - else - unique = subscriptions.add(data + 1, size - 1, pipe_); + pending_flags.push_back(0); + } + else + { + bool unique; + if (*data == 0) + unique = subscriptions.rm(data + 1, size - 1, pipe_); + else + unique = subscriptions.add(data + 1, size - 1, pipe_); - // If the (un)subscription is not a duplicate store it so that it can be - // passed to the user on next recv call unless verbose mode is enabled - // which makes to pass always these messages. - if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || - (*data == 0 && verbose_unsubs && verbose_subs))) { - pending_data.push_back(blob_t(data, size)); + // If the (un)subscription is not a duplicate store it so that it can be + // passed to the user on next recv call unless verbose mode is enabled + // which makes to pass always these messages. + if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || + (*data == 0 && verbose_unsubs && verbose_subs))) { + pending_data.push_back(blob_t(data, size)); pending_metadata.push_back(sub.metadata()); - pending_flags.push_back(0); - } - } + pending_flags.push_back(0); + } + } } else { // Process user message coming upstream from xsub socket @@ -131,46 +131,46 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) -{ - if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE || - option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) - { - if (optvallen_ != sizeof(int) || *static_cast (optval_) < 0) { - errno = EINVAL; - return -1; - } +{ + if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE || + option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) + { + if (optvallen_ != sizeof(int) || *static_cast (optval_) < 0) { + errno = EINVAL; + return -1; + } - if (option_ == ZMQ_XPUB_VERBOSE) - verbose_subs = (*static_cast (optval_) != 0); - else - if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE) - verbose_unsubs = (*static_cast (optval_) != 0); - else - if (option_ == ZMQ_XPUB_NODROP) - lossy = (*static_cast (optval_) == 0); - else - if (option_ == ZMQ_XPUB_MANUAL) - manual = (*static_cast (optval_) != 0); - } - else - if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL) - subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe); - else - if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL) - subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe); - else - if (option_ == ZMQ_XPUB_WELCOME_MSG) { - welcome_msg.close(); + if (option_ == ZMQ_XPUB_VERBOSE) + verbose_subs = (*static_cast (optval_) != 0); + else + if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE) + verbose_unsubs = (*static_cast (optval_) != 0); + else + if (option_ == ZMQ_XPUB_NODROP) + lossy = (*static_cast (optval_) == 0); + else + if (option_ == ZMQ_XPUB_MANUAL) + manual = (*static_cast (optval_) != 0); + } + else + if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL) + subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe); + else + if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL) + subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe); + else + if (option_ == ZMQ_XPUB_WELCOME_MSG) { + welcome_msg.close(); - if (optvallen_ > 0) { - welcome_msg.init_size(optvallen_); + if (optvallen_ > 0) { + welcome_msg.init_size(optvallen_); - unsigned char *data = (unsigned char*)welcome_msg.data(); - memcpy(data, optval_, optvallen_); - } - else - welcome_msg.init(); - } + unsigned char *data = (unsigned char*)welcome_msg.data(); + memcpy(data, optval_, optvallen_); + } + else + welcome_msg.init(); + } else { errno = EINVAL; return -1; @@ -211,7 +211,7 @@ int zmq::xpub_t::xsend (msg_t *msg_) int rc = -1; // Assume we fail if (lossy || dist.check_hwm ()) { if (dist.send_to_matching (msg_) == 0) { - // If we are at the end of multi-part message we can mark + // If we are at the end of multi-part message we can mark // all the pipes as non-matching. if (!msg_more) dist.unmatch (); @@ -244,11 +244,11 @@ int zmq::xpub_t::xrecv (msg_t *msg_) memcpy (msg_->data (), pending_data.front ().data (), pending_data.front ().size ()); - - // set metadata only if there is some - if (metadata_t* metadata = pending_metadata.front ()) { - msg_->set_metadata (metadata); - } + + // set metadata only if there is some + if (metadata_t* metadata = pending_metadata.front ()) { + msg_->set_metadata (metadata); + } msg_->set_flags (pending_flags.front ()); pending_data.pop_front (); diff --git a/src/xpub.hpp b/src/xpub.hpp index 1515529a..9508b1a0 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -96,14 +96,14 @@ namespace zmq // Drop messages if HWM reached, otherwise return with EAGAIN bool lossy; - // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE - bool manual; + // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE + bool manual; - // Last pipe send subscription message, only used if xpub is on manual - pipe_t *last_pipe; + // Last pipe send subscription message, only used if xpub is on manual + pipe_t *last_pipe; - // Welcome message to send to pipe when attached - msg_t welcome_msg; + // Welcome message to send to pipe when attached + msg_t welcome_msg; // List of pending (un)subscriptions, ie. those that were already // applied to the trie, but not yet received by the user. diff --git a/src/zmq.cpp b/src/zmq.cpp index 940c3b39..ca514a6b 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1065,11 +1065,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = ENOTSUP; return -1; #endif -} +} -// Create pollfd +// Create pollfd -void *zmq_pollfd_new () +void *zmq_pollfd_new () { return new zmq::signaler_t (); } @@ -1080,7 +1080,7 @@ int zmq_pollfd_close (void* p_) { zmq::signaler_t *s = (zmq::signaler_t*)p_; LIBZMQ_DELETE(s); - return 0; + return 0; } // Recv signal from pollfd @@ -1088,7 +1088,7 @@ int zmq_pollfd_close (void* p_) void zmq_pollfd_recv(void *p_) { zmq::signaler_t *s = (zmq::signaler_t*)p_; - s->recv (); + s->recv (); } // Wait until pollfd is signalled @@ -1096,7 +1096,7 @@ void zmq_pollfd_recv(void *p_) int zmq_pollfd_wait(void *p_, int timeout_) { zmq::signaler_t *s = (zmq::signaler_t*)p_; - return s->wait (timeout_); + return s->wait (timeout_); } // Get pollfd fd @@ -1108,7 +1108,7 @@ int zmq_pollfd_fd (void *p_) #endif { zmq::signaler_t *s = (zmq::signaler_t*)p_; - return s->get_fd (); + return s->get_fd (); } // Polling thread safe sockets version @@ -1153,27 +1153,27 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout int thread_safe; size_t thread_safe_size = sizeof(int); - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) { return -1; } // All thread safe sockets share same fd if (thread_safe) { - + // if poll fd is not set yet and events are set for this socket if (!use_pollfd && items_ [i].events) { use_pollfd = true; pollfds_size++; } } - else + else pollfds_size++; } else pollfds_size++; } - + if (pollfds_size > ZMQ_POLLITEMS_DFLT) { pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd)); alloc_assert (pollfds); @@ -1195,7 +1195,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout int thread_safe; size_t thread_safe_size = sizeof(int); - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) { if (pollfds != spollfds) free (pollfds); @@ -1212,7 +1212,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout return -1; } pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0; - pollfds_index++; + pollfds_index++; } } // Else, the poll item is a raw file descriptor. Just convert the @@ -1374,16 +1374,16 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout int thread_safe; size_t thread_safe_size = sizeof(int); - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) return -1; if (thread_safe && items_ [i].events) { use_pollfd = true; FD_SET (zmq_pollfd_fd (p_), &pollset_in); break; - } - } + } + } } zmq::fd_t maxfd = 0; @@ -1397,17 +1397,17 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout int thread_safe; size_t thread_safe_size = sizeof(int); - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) return -1; - + if (!thread_safe) { zmq::fd_t notify_fd; size_t zmq_fd_size = sizeof (zmq::fd_t); if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, &zmq_fd_size) == -1) return -1; - + if (items_ [i].events) { FD_SET (notify_fd, &pollset_in); if (maxfd < notify_fd) diff --git a/tests/test_connect_rid.cpp b/tests/test_connect_rid.cpp index 7b2c092e..4b6e032a 100644 --- a/tests/test_connect_rid.cpp +++ b/tests/test_connect_rid.cpp @@ -55,7 +55,7 @@ void test_stream_2_stream(){ assert (rconn1); ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero)); assert (0 == ret); - + // Do the connection. ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6); assert (0 == ret); @@ -67,7 +67,7 @@ void test_stream_2_stream(){ assert (0 == ret); ret = zmq_connect (rconn1, bindip); assert (0 == ret); -*/ +*/ // Send data to the bound stream. ret = zmq_send (rconn1, "conn1", 6, ZMQ_SNDMORE); assert (6 == ret); @@ -112,7 +112,7 @@ void test_router_2_router(bool named){ // Create connection socket. rconn1 = zmq_socket (ctx, ZMQ_ROUTER); - assert (rconn1); + assert (rconn1); ret = zmq_setsockopt (rconn1, ZMQ_LINGER, &zero, sizeof (zero)); assert (0 == ret); @@ -122,12 +122,12 @@ void test_router_2_router(bool named){ ret = zmq_setsockopt (rconn1, ZMQ_IDENTITY, "Y", 1); } - // Make call to connect using a connect_rid. + // Make call to connect using a connect_rid. ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6); assert (0 == ret); ret = zmq_connect (rconn1, bindip); assert (0 == ret); -/* Uncomment to test assert on duplicate rid +/* Uncomment to test assert on duplicate rid // Test duplicate connect attempt. ret = zmq_setsockopt (rconn1, ZMQ_CONNECT_RID, "conn1", 6); assert (0 == ret); @@ -142,9 +142,9 @@ void test_router_2_router(bool named){ // Receive the name. ret = zmq_recv (rbind, buff, 256, 0); - if (named) + if (named) assert (ret && 'Y' == buff[0]); - else + else assert (ret && 0 == buff[0]); // Receive the data. @@ -162,7 +162,7 @@ void test_router_2_router(bool named){ } ret = zmq_send_const (rbind, "ok", 3, 0); assert (3 == ret); - + // If bound socket identity naming a problem, we'll likely see something funky here. ret = zmq_recv (rconn1, buff, 256, 0); assert ('c' == buff[0] && 6 == ret); @@ -183,7 +183,7 @@ int main (void) { setup_test_environment (); - test_stream_2_stream (); + test_stream_2_stream (); test_router_2_router (false); test_router_2_router (true); diff --git a/tests/test_srcfd.cpp b/tests/test_srcfd.cpp index faf2f2f8..09bd407e 100644 --- a/tests/test_srcfd.cpp +++ b/tests/test_srcfd.cpp @@ -44,7 +44,7 @@ int main (void) { int rc; - + setup_test_environment(); // Create the infrastructure void *ctx = zmq_ctx_new (); @@ -70,15 +70,15 @@ int main (void) zmq_recvmsg(rep, &msg, 0); assert(zmq_msg_size(&msg) == MSG_SIZE); - - // get the messages source file descriptor + + // get the messages source file descriptor int srcFd = zmq_msg_get(&msg, ZMQ_SRCFD); assert(srcFd >= 0); rc = zmq_msg_close(&msg); assert (rc == 0); - // get the remote endpoint + // get the remote endpoint struct sockaddr_storage ss; #ifdef ZMQ_HAVE_HPUX int addrlen = sizeof ss; @@ -92,7 +92,7 @@ int main (void) rc = getnameinfo ((struct sockaddr*) &ss, addrlen, host, sizeof host, NULL, 0, NI_NUMERICHOST); assert (rc == 0); - // assert it is localhost which connected + // assert it is localhost which connected assert (strcmp(host, "127.0.0.1") == 0); rc = zmq_close (rep); @@ -100,14 +100,14 @@ int main (void) rc = zmq_close (req); assert (rc == 0); - // sleep a bit for the socket to be freed - usleep(30000); - - // getting name from closed socket will fail + // sleep a bit for the socket to be freed + usleep(30000); + + // getting name from closed socket will fail rc = getpeername (srcFd, (struct sockaddr*) &ss, &addrlen); assert (rc == -1); assert (errno == EBADF); - + rc = zmq_ctx_term (ctx); assert (rc == 0); diff --git a/tests/test_stream_disconnect.cpp b/tests/test_stream_disconnect.cpp index 0c56328f..d114001b 100644 --- a/tests/test_stream_disconnect.cpp +++ b/tests/test_stream_disconnect.cpp @@ -50,7 +50,7 @@ bool has_more (void* socket) int more = 0; size_t more_size = sizeof(more); int rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); - if (rc != 0) + if (rc != 0) return false; return more != 0; } @@ -165,18 +165,18 @@ int main(int, char**) // Grab the 1st frame (peer identity). zmq_msg_t peer_frame; rc = zmq_msg_init (&peer_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0); - assert (rc != -1); - assert(zmq_msg_size (&peer_frame) > 0); + assert (rc != -1); + assert(zmq_msg_size (&peer_frame) > 0); assert (has_more (sockets [SERVER])); // Grab the 2nd frame (actual payload). zmq_msg_t data_frame; rc = zmq_msg_init (&data_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0); - assert (rc != -1); + assert (rc != -1); // Make sure payload matches what we expect. const char * const data = (const char*)zmq_msg_data (&data_frame); @@ -184,39 +184,39 @@ int main(int, char**) // 0-length frame is a disconnection notification. The server // should receive it as the last step in the dialogue. if (size == 0) { - ++step; + ++step; assert (step == steps); } else { - assert ((size_t) size == strlen (dialog [step].text)); - int cmp = memcmp (dialog [step].text, data, size); - assert (cmp == 0); + assert ((size_t) size == strlen (dialog [step].text)); + int cmp = memcmp (dialog [step].text, data, size); + assert (cmp == 0); - ++step; + ++step; assert (step < steps); // Prepare the response. rc = zmq_msg_close (&data_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_init_size (&data_frame, - strlen (dialog [step].text)); - assert (rc == 0); + strlen (dialog [step].text)); + assert (rc == 0); memcpy (zmq_msg_data (&data_frame), dialog [step].text, - zmq_msg_size (&data_frame)); + zmq_msg_size (&data_frame)); // Send the response. rc = zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE); - assert (rc != -1); + assert (rc != -1); rc = zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE); - assert (rc != -1); + assert (rc != -1); } // Release resources. rc = zmq_msg_close (&peer_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_close (&data_frame); - assert (rc == 0); + assert (rc == 0); } // Check for data received by the client. @@ -226,24 +226,24 @@ int main(int, char**) // Grab the 1st frame (peer identity). zmq_msg_t peer_frame; rc = zmq_msg_init (&peer_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0); - assert (rc != -1); - assert(zmq_msg_size (&peer_frame) > 0); + assert (rc != -1); + assert(zmq_msg_size (&peer_frame) > 0); assert (has_more (sockets [CLIENT])); // Grab the 2nd frame (actual payload). zmq_msg_t data_frame; rc = zmq_msg_init (&data_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0); - assert (rc != -1); - assert(zmq_msg_size (&data_frame) > 0); + assert (rc != -1); + assert(zmq_msg_size (&data_frame) > 0); // Make sure payload matches what we expect. const char * const data = (const char*)zmq_msg_data (&data_frame); const int size = zmq_msg_size (&data_frame); - assert ((size_t)size == strlen(dialog [step].text)); + assert ((size_t)size == strlen(dialog [step].text)); int cmp = memcmp(dialog [step].text, data, size); assert (cmp == 0); @@ -252,22 +252,22 @@ int main(int, char**) // Prepare the response (next line in the dialog). assert (step < steps); rc = zmq_msg_close (&data_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_init_size (&data_frame, strlen (dialog [step].text)); - assert (rc == 0); + assert (rc == 0); memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame)); // Send the response. rc = zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE); - assert (rc != -1); + assert (rc != -1); rc = zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE); - assert (rc != -1); + assert (rc != -1); // Release resources. rc = zmq_msg_close (&peer_frame); - assert (rc == 0); + assert (rc == 0); rc = zmq_msg_close (&data_frame); - assert (rc == 0); + assert (rc == 0); } } assert (step == steps); diff --git a/tests/test_thread_safe.cpp b/tests/test_thread_safe.cpp index 3410260e..b07afaa4 100644 --- a/tests/test_thread_safe.cpp +++ b/tests/test_thread_safe.cpp @@ -56,42 +56,42 @@ int main (void) rc = zmq_connect (client2, "tcp://127.0.0.1:5560"); assert (rc == 0); - void* t1 = zmq_threadstart(worker1, client2); - void* t2 = zmq_threadstart(worker2, client2); + void* t1 = zmq_threadstart(worker1, client2); + void* t2 = zmq_threadstart(worker2, client2); - char data[1]; - data[0] = 0; + char data[1]; + data[0] = 0; - for (int i=0; i < 10; i++) { - rc = zmq_send_const(client, data, 1, 0); - assert (rc == 1); + for (int i=0; i < 10; i++) { + rc = zmq_send_const(client, data, 1, 0); + assert (rc == 1); - rc = zmq_send_const(client, data, 1, 0); - assert(rc == 1); + rc = zmq_send_const(client, data, 1, 0); + assert(rc == 1); - char a, b; + char a, b; - rc = zmq_recv(client, &a, 1, 0); - assert(rc == 1); + rc = zmq_recv(client, &a, 1, 0); + assert(rc == 1); - rc = zmq_recv(client, &b, 1, 0); - assert(rc == 1); + rc = zmq_recv(client, &b, 1, 0); + assert(rc == 1); - // make sure they came from different threads - assert((a == 1 && b == 2) || (a == 2 && b == 1)); - } + // make sure they came from different threads + assert((a == 1 && b == 2) || (a == 2 && b == 1)); + } - // make the thread exit - data[0] = 1; + // make the thread exit + data[0] = 1; - rc = zmq_send_const(client, data, 1, 0); - assert (rc == 1); + rc = zmq_send_const(client, data, 1, 0); + assert (rc == 1); - rc = zmq_send_const(client, data, 1, 0); - assert(rc == 1); + rc = zmq_send_const(client, data, 1, 0); + assert(rc == 1); - zmq_threadclose(t1); - zmq_threadclose(t2); + zmq_threadclose(t1); + zmq_threadclose(t2); rc = zmq_close (client2); assert (rc == 0); @@ -107,52 +107,52 @@ int main (void) void worker1(void* s) { - const char worker_id = 1; - char c; + const char worker_id = 1; + char c; - while (true) - { - int rc = zmq_recv(s, &c,1, 0); - assert(rc == 1); + while (true) + { + int rc = zmq_recv(s, &c,1, 0); + assert(rc == 1); - if (c == 0) - { - msleep(100); - rc = zmq_send_const(s,&worker_id, 1, 0); - assert(rc == 1); - } - else - { - // we got exit request - break; - } - } + if (c == 0) + { + msleep(100); + rc = zmq_send_const(s,&worker_id, 1, 0); + assert(rc == 1); + } + else + { + // we got exit request + break; + } + } } void worker2(void* s) { - const char worker_id = 2; - char c; + const char worker_id = 2; + char c; - while (true) - { - int rc = zmq_recv(s, &c,1, 0); - assert(rc == 1); + while (true) + { + int rc = zmq_recv(s, &c,1, 0); + assert(rc == 1); - assert(c == 1 || c == 0); + assert(c == 1 || c == 0); - if (c == 0) - { - msleep(100); - rc = zmq_send_const(s,&worker_id, 1, 0); - assert(rc == 1); - } - else - { - // we got exit request - break; - } - } + if (c == 0) + { + msleep(100); + rc = zmq_send_const(s,&worker_id, 1, 0); + assert(rc == 1); + } + else + { + // we got exit request + break; + } + } } diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index 08e6a638..b32a008c 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -43,42 +43,42 @@ int main (void) // set pub socket options int manual = 1; - rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4); - assert (rc == 0); + rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4); + assert (rc == 0); // Create a subscriber void *sub = zmq_socket (ctx, ZMQ_XSUB); assert (sub); rc = zmq_connect (sub, "inproc://soname"); - assert (rc == 0); - + assert (rc == 0); + // Subscribe for A - char subscription[2] = { 1, 'A'}; - rc = zmq_send_const(sub, subscription, 2, 0); - assert (rc == 2); + char subscription[2] = { 1, 'A'}; + rc = zmq_send_const(sub, subscription, 2, 0); + assert (rc == 2); - char buffer[2]; - - // Receive subscriptions from subscriber - rc = zmq_recv(pub, buffer, 2, 0); - assert(rc == 2); - assert(buffer[0] == 1); - assert(buffer[1] == 'A'); + char buffer[2]; - // Subscribe socket for B instead - rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "B", 1); - assert(rc == 0); + // Receive subscriptions from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 1); + assert(buffer[1] == 'A'); - // Sending A message and B Message - rc = zmq_send_const(pub, "A", 1, 0); - assert(rc == 1); + // Subscribe socket for B instead + rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "B", 1); + assert(rc == 0); - rc = zmq_send_const(pub, "B", 1, 0); - assert(rc == 1); + // Sending A message and B Message + rc = zmq_send_const(pub, "A", 1, 0); + assert(rc == 1); - rc = zmq_recv(sub, buffer, 1, ZMQ_DONTWAIT); - assert(rc == 1); - assert(buffer[0] == 'B'); + rc = zmq_send_const(pub, "B", 1, 0); + assert(rc == 1); + + rc = zmq_recv(sub, buffer, 1, ZMQ_DONTWAIT); + assert(rc == 1); + assert(buffer[0] == 'B'); // Clean up. rc = zmq_close (pub); diff --git a/tests/test_xpub_welcome_msg.cpp b/tests/test_xpub_welcome_msg.cpp index b6d19014..33cd97ed 100644 --- a/tests/test_xpub_welcome_msg.cpp +++ b/tests/test_xpub_welcome_msg.cpp @@ -41,20 +41,20 @@ int main (void) int rc = zmq_bind (pub, "inproc://soname"); assert (rc == 0); - // set pub socket options - rc = zmq_setsockopt(pub, ZMQ_XPUB_WELCOME_MSG, "W", 1); - assert (rc == 0); + // set pub socket options + rc = zmq_setsockopt(pub, ZMQ_XPUB_WELCOME_MSG, "W", 1); + assert (rc == 0); // Create a subscriber void *sub = zmq_socket (ctx, ZMQ_SUB); - + // Subscribe to the welcome message rc = zmq_setsockopt(sub, ZMQ_SUBSCRIBE, "W", 1); assert(rc == 0); assert (sub); rc = zmq_connect (sub, "inproc://soname"); - assert (rc == 0); + assert (rc == 0); char buffer[2]; @@ -63,11 +63,11 @@ int main (void) assert(rc == 2); assert(buffer[0] == 1); assert(buffer[1] == 'W'); - + // Receive the welcome message rc = zmq_recv(sub, buffer, 1, 0); assert(rc == 1); - assert(buffer[0] == 'W'); + assert(buffer[0] == 'W'); // Clean up. rc = zmq_close (pub);