diff --git a/src/zmq.cpp b/src/zmq.cpp index 7157b038..6daf7c33 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -478,10 +478,6 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) // The iov_base* buffers of each iovec *a_ filled in by this // function may be freed using free(). // -// Implementation note: We assume zmq::msg_t buffer allocated -// by zmq::recvmsg can be freed by free(). -// We assume it is safe to steal these buffers by simply -// not closing the zmq::msg_t. // int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) { @@ -498,8 +494,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) *count_ = 0; for (size_t i = 0; recvmore && i < count; ++i) { - // Cheat! We never close any msg - // because we want to steal the buffer. + zmq_msg_t msg; int rc = zmq_msg_init (&msg); errno_assert (rc == 0); @@ -513,15 +508,21 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) nread = -1; break; } - ++*count_; - ++nread; - // Cheat: acquire zmq_msg buffer. - a_[i].iov_base = static_cast (zmq_msg_data (&msg)); a_[i].iov_len = zmq_msg_size (&msg); - + a_[i].iov_base = malloc(a_[i].iov_len); + if (!a_[i].iov_base) { + errno = ENOMEM; + return -1; + } + memcpy(a_[i].iov_base,static_cast (zmq_msg_data (&msg)), + a_[i].iov_len); // Assume zmq_socket ZMQ_RVCMORE is properly set. recvmore = ((zmq::msg_t*) (void *) &msg)->flags () & zmq::msg_t::more; + rc = zmq_msg_close(&msg); + errno_assert (rc == 0); + ++*count_; + ++nread; } return nread; } diff --git a/tests/Makefile.am b/tests/Makefile.am index 849230b7..139d78d6 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -20,7 +20,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_router_mandatory \ test_raw_sock \ test_disconnect_inproc \ - test_ctx_options + test_ctx_options \ + test_iov if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ diff --git a/tests/test_iov.cpp b/tests/test_iov.cpp new file mode 100644 index 00000000..b5ac817b --- /dev/null +++ b/tests/test_iov.cpp @@ -0,0 +1,111 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "../include/zmq.h" +#include +#include +#include +#include +#undef NDEBUG +#include + +// XSI vector I/O +#if defined ZMQ_HAVE_UIO +#include +#else +struct iovec { + void *iov_base; + size_t iov_len; +}; +#endif + +void do_check(void* sb, void* sc, unsigned int msgsz) +{ + int rc; + int sum =0; + for (int i = 0; i < 10; i++) + { + zmq_msg_t msg; + zmq_msg_init_size(&msg, msgsz); + void * data = zmq_msg_data(&msg); + memcpy(data,&i, sizeof(int)); + rc = zmq_msg_send(&msg,sc,i==9 ? 0 :ZMQ_SNDMORE); + assert (rc == (int)msgsz); + zmq_msg_close(&msg); + sum += i; + } + + struct iovec ibuffer[32] ; + memset(&ibuffer[0], 0, sizeof(ibuffer)); + + size_t count = 10; + rc = zmq_recviov(sb,&ibuffer[0],&count,0); + assert (rc == 10); + + int rsum=0; + for(;count;--count) + { + int v; + memcpy(&v,ibuffer[count-1].iov_base,sizeof(int)); + rsum += v; + assert(ibuffer[count-1].iov_len == msgsz); + // free up the memory + free(ibuffer[count-1].iov_base); + } + + assert ( sum == rsum ); + +} + +int main (void) +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + int rc; + + void *sb = zmq_socket (ctx, ZMQ_PULL); + assert (sb); + + rc = zmq_bind (sb, "inproc://a"); + assert (rc == 0); + + ::sleep(1); + void *sc = zmq_socket (ctx, ZMQ_PUSH); + + rc = zmq_connect (sc, "inproc://a"); + assert (rc == 0); + + + // message bigger than vsm max + do_check(sb,sc,100); + + // message smaller than vsm max + do_check(sb,sc,10); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0; +}