diff --git a/AUTHORS b/AUTHORS index 53982ad8..3497eae8 100644 --- a/AUTHORS +++ b/AUTHORS @@ -77,6 +77,7 @@ Toralf Wittner Tore Halvorsen Vitaly Mayatskikh Lourens Naudé +Hardeep Singh Credits ======= diff --git a/include/zmq.h b/include/zmq.h index d4c6070f..85366ed2 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -250,6 +250,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_DELAY_ATTACH_ON_CONNECT 39 #define ZMQ_XPUB_VERBOSE 40 +#define ZMQ_ROUTER_RAW_SOCK 41 /* Message options */ diff --git a/src/Makefile.am b/src/Makefile.am index 340c997b..f06a967d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -146,7 +146,11 @@ libzmq_la_SOURCES = \ v1_protocol.hpp \ xsub.cpp \ zmq.cpp \ - zmq_utils.cpp + zmq_utils.cpp \ + raw_decoder.hpp \ + raw_decoder.cpp \ + raw_encoder.hpp \ + raw_encoder.cpp if ON_MINGW libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ diff --git a/src/decoder.hpp b/src/decoder.hpp index baf67cfa..4a0c0ed6 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -143,6 +143,11 @@ namespace zmq } } + inline bool message_ready_size (size_t msg_sz){ + zmq_assert(false); + return false; + } + protected: // Prototype of state machine action. Action should return false if diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index 2555f128..549fc133 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -41,7 +41,8 @@ namespace zmq virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; virtual bool stalled () const = 0; - + + virtual bool message_ready_size (size_t msg_sz) = 0; }; } diff --git a/src/options.cpp b/src/options.cpp index d7f9b092..0b09a209 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -55,6 +55,7 @@ zmq::options_t::options_t () : tcp_keepalive_intvl (-1), socket_id (0) { + raw_sock = false; } int zmq::options_t::setsockopt (int option_, const void *optval_, diff --git a/src/options.hpp b/src/options.hpp index 33c73a6b..403d59fc 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -114,6 +114,9 @@ namespace zmq // If true, the identity message is forwarded to the socket. bool recv_identity; + + // if true, router socket accepts non-zmq tcp connections + bool raw_sock; // TCP keep-alive settings. // Defaults to -1 = do not change socket options diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp new file mode 100644 index 00000000..f7e9925b --- /dev/null +++ b/src/raw_decoder.cpp @@ -0,0 +1,99 @@ +/* + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "platform.hpp" +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + +#include "raw_decoder.hpp" +#include "likely.hpp" +#include "wire.hpp" +#include "err.hpp" + +zmq::raw_decoder_t::raw_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, i_msg_sink *msg_sink_) : + decoder_base_t (bufsize_), + msg_sink (msg_sink_), + maxmsgsize (maxmsgsize_) +{ + int rc = in_progress.init (); + errno_assert (rc == 0); +} + +zmq::raw_decoder_t::~raw_decoder_t () +{ + int rc = in_progress.close (); + errno_assert (rc == 0); +} + +void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) +{ + msg_sink = msg_sink_; +} + +bool zmq::raw_decoder_t::stalled () const +{ + return false; +} + +bool zmq::raw_decoder_t::message_ready_size (size_t msg_sz) +{ + int rc = in_progress.init_size (msg_sz); + if (rc != 0) { + errno_assert (errno == ENOMEM); + rc = in_progress.init (); + errno_assert (rc == 0); + decoding_error (); + return false; + } + + next_step (in_progress.data (), in_progress.size (), + &raw_decoder_t::raw_message_ready); + + return true; +} + +bool zmq::raw_decoder_t::raw_message_ready () +{ + zmq_assert (in_progress.size ()); + // Message is completely read. Push it further and start reading + // new message. (in_progress is a 0-byte message after this point.) + if (unlikely (!msg_sink)) + return false; + int rc = msg_sink->push_msg (&in_progress); + if (unlikely (rc != 0)) { + if (errno != EAGAIN) + decoding_error (); + return false; + } + + // NOTE: This is just to break out of process_buffer + // raw_message_ready should never get called in state machine w/o + // message_ready_size from stream_engine. + next_step (in_progress.data (), 1, + &raw_decoder_t::raw_message_ready); + + return true; +} diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp new file mode 100644 index 00000000..05bacb1f --- /dev/null +++ b/src/raw_decoder.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_RAW_DECODER_HPP_INCLUDED__ +#define __ZMQ_RAW_DECODER_HPP_INCLUDED__ + +#include "err.hpp" +#include "msg.hpp" +#include "decoder.hpp" +#include "raw_decoder.hpp" +#include "i_msg_sink.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // Decoder for 0MQ v1 framing protocol. Converts data stream into messages. + + class raw_decoder_t : public decoder_base_t + { + public: + + raw_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, i_msg_sink *msg_sink_); + virtual ~raw_decoder_t (); + + // i_decoder interface. + virtual void set_msg_sink (i_msg_sink *msg_sink_); + + virtual bool stalled () const; + + virtual bool message_ready_size (size_t msg_sz); + + private: + + + bool raw_message_ready (); + + i_msg_sink *msg_sink; + msg_t in_progress; + + const int64_t maxmsgsize; + + raw_decoder_t (const raw_decoder_t&); + void operator = (const raw_decoder_t&); + }; + +} + +#endif + diff --git a/src/raw_encoder.cpp b/src/raw_encoder.cpp new file mode 100644 index 00000000..eac12934 --- /dev/null +++ b/src/raw_encoder.cpp @@ -0,0 +1,87 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "encoder.hpp" +#include "raw_encoder.hpp" +#include "i_msg_source.hpp" +#include "likely.hpp" +#include "wire.hpp" + +zmq::raw_encoder_t::raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_) : + encoder_base_t (bufsize_), + msg_source (msg_source_) +{ + int rc = in_progress.init (); + errno_assert (rc == 0); + + // Write 0 bytes to the batch and go to message_ready state. + next_step (NULL, 0, &raw_encoder_t::raw_message_ready, true); +} + +zmq::raw_encoder_t::~raw_encoder_t () +{ + int rc = in_progress.close (); + errno_assert (rc == 0); +} + +void zmq::raw_encoder_t::set_msg_source (i_msg_source *msg_source_) +{ + msg_source = msg_source_; +} + +bool zmq::raw_encoder_t::raw_message_size_ready () +{ + // Write message body into the buffer. + next_step (in_progress.data (), in_progress.size (), + &raw_encoder_t::raw_message_ready, !(in_progress.flags () & msg_t::more)); + return true; +} + +bool zmq::raw_encoder_t::raw_message_ready () +{ + + // Destroy content of the old message. + int rc = in_progress.close (); + errno_assert (rc == 0); + + // Read new message. If there is none, return false. + // Note that new state is set only if write is successful. That way + // unsuccessful write will cause retry on the next state machine + // invocation. + if (unlikely (!msg_source)) { + rc = in_progress.init (); + errno_assert (rc == 0); + return false; + } + rc = msg_source->pull_msg (&in_progress); + if (unlikely (rc != 0)) { + errno_assert (errno == EAGAIN); + rc = in_progress.init (); + errno_assert (rc == 0); + return false; + } + + in_progress.reset_flags(0xff); + next_step (NULL, 0, &raw_encoder_t::raw_message_size_ready, true); + + return true; +} diff --git a/src/raw_encoder.hpp b/src/raw_encoder.hpp new file mode 100644 index 00000000..748a68fd --- /dev/null +++ b/src/raw_encoder.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_RAW_ENCODER_HPP_INCLUDED__ +#define __ZMQ_RAW_ENCODER_HPP_INCLUDED__ + +#if defined(_MSC_VER) +#ifndef NOMINMAX +#define NOMINMAX +#endif +#endif + +#include +#include +#include +#include + +#include "err.hpp" +#include "msg.hpp" +#include "i_encoder.hpp" + +namespace zmq +{ + + + // Encoder for 0MQ framing protocol. Converts messages into data batches. + + class raw_encoder_t : public encoder_base_t + { + public: + + raw_encoder_t (size_t bufsize_, i_msg_source *msg_source_); + ~raw_encoder_t (); + + void set_msg_source (i_msg_source *msg_source_); + + private: + + bool raw_message_ready (); + bool raw_message_size_ready (); + + i_msg_source *msg_source; + msg_t in_progress; + unsigned char tmpbuf [4]; + raw_encoder_t (const raw_encoder_t&); + const raw_encoder_t &operator = (const raw_encoder_t&); + }; +} + +#endif + diff --git a/src/router.cpp b/src/router.cpp index 9374f247..67de97bd 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -35,7 +35,8 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : current_out (NULL), more_out (false), next_peer_id (generate_random ()), - mandatory(false) + mandatory(false), + raw_sock(false) { options.type = ZMQ_ROUTER; @@ -46,6 +47,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : // options.delay_on_disconnect = false; options.recv_identity = true; + options.raw_sock = false; prefetched_id.init (); prefetched_msg.init (); @@ -76,7 +78,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) int zmq::router_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - if (option_ != ZMQ_ROUTER_MANDATORY) { + if (option_ != ZMQ_ROUTER_MANDATORY && + option_ != ZMQ_ROUTER_RAW_SOCK) { errno = EINVAL; return -1; } @@ -84,7 +87,16 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } - mandatory = *static_cast (optval_); + if(option_ == ZMQ_ROUTER_RAW_SOCK){ + raw_sock = *static_cast (optval_); + if(raw_sock){ + options.recv_identity = false; + options.raw_sock = true; + } + + }else{ + mandatory = *static_cast (optval_); + } return 0; } @@ -174,11 +186,27 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) return 0; } + // ignore the MORE flag for raw-sock or assert? + if(options.raw_sock) + msg_->reset_flags(msg_t::more); + // Check whether this is the last part of the message. more_out = msg_->flags () & msg_t::more ? true : false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { + + // Close the remote connection if user has asked to do so + // by sending zero length message. + // Pending messages in the pipe will be dropped (on receiving term- ack) + if (raw_sock && msg_->size() == 0){ + current_out->terminate(false); + int rc = msg_->close (); + errno_assert (rc == 0); + current_out = NULL; + return 0; + } + bool ok = current_out->write (msg_); if (unlikely (!ok)) current_out = NULL; @@ -319,28 +347,36 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) { msg_t msg; blob_t identity; + bool ok; - msg.init (); - bool ok = pipe_->read (&msg); - if (!ok) - return false; - - if (msg.size () == 0) { - // Fall back on the auto-generation + if(options.raw_sock){ // always assign identity for raw-socket unsigned char buf [5]; buf [0] = 0; put_uint32 (buf + 1, next_peer_id++); identity = blob_t (buf, sizeof buf); - msg.close (); - } - else { - identity = blob_t ((unsigned char*) msg.data (), msg.size ()); - outpipes_t::iterator it = outpipes.find (identity); - msg.close (); - - // Ignore peers with duplicate ID. - if (it != outpipes.end ()) + }else{ + msg.init (); + ok = pipe_->read (&msg); + if (!ok) return false; + + if (msg.size () == 0) { + // Fall back on the auto-generation + unsigned char buf [5]; + buf [0] = 0; + put_uint32 (buf + 1, next_peer_id++); + identity = blob_t (buf, sizeof buf); + msg.close (); + } + else { + identity = blob_t ((unsigned char*) msg.data (), msg.size ()); + outpipes_t::iterator it = outpipes.find (identity); + msg.close (); + + // Ignore peers with duplicate ID. + if (it != outpipes.end ()) + return false; + } } pipe_->set_identity (identity); diff --git a/src/router.hpp b/src/router.hpp index f9766553..1a4b10b5 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -113,6 +113,7 @@ namespace zmq // If true, report EAGAIN to the caller instead of silently dropping // the message targeting an unknown peer. bool mandatory; + bool raw_sock; router_t (const router_t&); const router_t &operator = (const router_t&); diff --git a/src/session_base.cpp b/src/session_base.cpp index 145c047c..97fcc6d3 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -120,6 +120,11 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, identity_received (false), addr (addr_) { + // identities are not exchanged for raw sockets + if(options.raw_sock){ + identity_sent = (true); + identity_received = (true); + } } zmq::session_base_t::~session_base_t () @@ -245,6 +250,15 @@ void zmq::session_base_t::terminated (pipe_t *pipe_) // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); + if (!is_terminating() && options.raw_sock){ + if(engine){ + engine->terminate (); + engine = NULL; + } + terminate(); + } + + // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 072f330d..01ae0c21 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -42,6 +42,8 @@ #include "decoder.hpp" #include "v1_encoder.hpp" #include "v1_decoder.hpp" +#include "raw_decoder.hpp" +#include "raw_encoder.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" @@ -133,13 +135,26 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, io_object_t::plug (io_thread_); handle = add_fd (s); - // Send the 'length' and 'flags' fields of the identity message. - // The 'length' field is encoded in the long format. - outpos = greeting_output_buffer; - outpos [outsize++] = 0xff; - put_uint64 (&outpos [outsize], options.identity_size + 1); - outsize += 8; - outpos [outsize++] = 0x7f; + if(options.raw_sock){ + // no handshaking for raw sock, instantiate raw encoder and decoders + encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session); + alloc_assert (encoder); + + decoder = new (std::nothrow) + raw_decoder_t (in_batch_size, options.maxmsgsize, session); + alloc_assert (decoder); + + // disable handshaking for raw socket + handshaking = false; + }else{ + // Send the 'length' and 'flags' fields of the identity message. + // The 'length' field is encoded in the long format. + outpos = greeting_output_buffer; + outpos [outsize++] = 0xff; + put_uint64 (&outpos [outsize], options.identity_size + 1); + outsize += 8; + outpos [outsize++] = 0x7f; + } set_pollin (handle); set_pollout (handle); @@ -181,6 +196,7 @@ void zmq::stream_engine_t::in_event () zmq_assert (decoder); bool disconnection = false; + size_t processed; // If there's no data to process in the buffer... if (!insize) { @@ -199,8 +215,16 @@ void zmq::stream_engine_t::in_event () } } - // Push the data to the decoder. - size_t processed = decoder->process_buffer (inpos, insize); + if(options.raw_sock){ + if(insize == 0 || !decoder->message_ready_size(insize)){ + processed = 0; + }else{ + processed = decoder->process_buffer (inpos, insize); + } + }else{ + // Push the data to the decoder. + processed = decoder->process_buffer (inpos, insize); + } if (unlikely (processed == (size_t) -1)) { disconnection = true; diff --git a/tests/Makefile.am b/tests/Makefile.am index cb96e66a..e35ea470 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -17,7 +17,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_last_endpoint \ test_term_endpoint \ test_monitor \ - test_router_mandatory + test_router_mandatory \ + test_raw_sock if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -41,6 +42,7 @@ test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp test_router_mandatory_SOURCES = test_router_mandatory.cpp +test_raw_sock_SOURCES = test_raw_sock.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_raw_sock.cpp b/tests/test_raw_sock.cpp new file mode 100644 index 00000000..99a03e4a --- /dev/null +++ b/tests/test_raw_sock.cpp @@ -0,0 +1,167 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2011 250bpm s.r.o. + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +//ToDo: Windows? +const char *test_str = "TEST-STRING"; + + +int tcp_client(){ + + int sockfd, portno; + struct sockaddr_in serv_addr; + struct hostent *server; + + portno = 5555; + + sockfd = socket(AF_INET, SOCK_STREAM, 0); + assert(sockfd >=0 ); + server = gethostbyname("localhost"); + assert(server); + + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy((char *)server->h_addr, + (char *)&serv_addr.sin_addr.s_addr, + server->h_length); + serv_addr.sin_port = htons(portno); + + if (connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0) + assert(0); + int nodelay = 1; + int rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, + sizeof (int)); + assert(rc == 0); + + + return sockfd; +} + +void tcp_client_write(int sockfd, const void *buf, int buf_len){ + assert(buf); + int n = write(sockfd, buf, buf_len); + assert(n >= 0); +} + +void tcp_client_read(int sockfd){ + struct timeval tm; + tm.tv_sec = 1; + tm.tv_usec = 0; + fd_set r; + + int sr; + char buffer[16]; + + FD_ZERO(&r); + FD_SET(sockfd, &r); + + if ((sr = select(sockfd + 1, &r, NULL, NULL, &tm)) <= 0) + { + assert(0); + } + + int n = read(sockfd, buffer, 16); + assert(n>0); + assert(memcmp(buffer, test_str, strlen(test_str)) == 0); +} + + +void tcp_client_close(int sockfd){ + close(sockfd); +} + + +int main(){ + fprintf (stderr, "test_raw_sock running...\n"); + + zmq_msg_t message; + zmq_msg_t id; + + //=================== + void *ctx = zmq_init (1); + assert (ctx); + + int raw_sock = 1, rc = 0; + void *sb = zmq_socket (ctx, ZMQ_ROUTER); + assert (sb); + rc = zmq_setsockopt( sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof(int)); + assert(rc == 0); + rc = zmq_bind (sb, "tcp://127.0.0.1:5555"); + assert (rc == 0); + + int sock_fd = tcp_client(); + assert(sock_fd >= 0); + // =================== + + zmq_msg_init(&message); + zmq_msg_init(&id); + assert (rc == 0); + + zmq_pollitem_t items [] = { + { sb, 0, ZMQ_POLLIN, 0 }, + }; + + tcp_client_write(sock_fd, test_str, strlen(test_str)); + zmq_poll (items, 1, 500); + if (items [0].revents & ZMQ_POLLIN) { + int n = zmq_msg_recv (&id, sb, 0); + assert(n > 0); + n = zmq_msg_recv (&message, sb, 0); + assert(n > 0); + assert(memcmp(zmq_msg_data (&message), test_str, strlen(test_str)) == 0); + }else{ + assert(0); + } + + zmq_msg_send (&id, sb, ZMQ_SNDMORE); + zmq_msg_send (&message, sb, ZMQ_SNDMORE);// SNDMORE option is ignored + + tcp_client_read(sock_fd); + tcp_client_close(sock_fd); + + zmq_msg_close(&id); + zmq_msg_close(&message); + + + zmq_close(sb); + zmq_term(ctx); + + fprintf (stderr, "test_raw_sock PASSED.\n"); + + return 0; +} + + + +