diff --git a/include/zmq.h b/include/zmq.h index 5dff076d..2d493bfc 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -87,6 +87,7 @@ typedef unsigned __int8 uint8_t; # include #endif +#include /******************************************************************************/ /* 0MQ errors. */ @@ -212,8 +213,8 @@ ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg, size_t size); ZMQ_EXPORT int zmq_msg_init_data (zmq_msg_t *msg, void *data, size_t size, zmq_free_fn *ffn, void *hint); -ZMQ_EXPORT int zmq_msg_send (zmq_msg_t *msg, void *s, int flags); -ZMQ_EXPORT int zmq_msg_recv (zmq_msg_t *msg, void *s, int flags); +ZMQ_EXPORT ssize_t zmq_msg_send (zmq_msg_t *msg, void *s, int flags); +ZMQ_EXPORT ssize_t zmq_msg_recv (zmq_msg_t *msg, void *s, int flags); ZMQ_EXPORT int zmq_msg_close (zmq_msg_t *msg); ZMQ_EXPORT int zmq_msg_move (zmq_msg_t *dest, zmq_msg_t *src); ZMQ_EXPORT int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); @@ -360,9 +361,9 @@ ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr); ZMQ_EXPORT int zmq_unbind (void *s, const char *addr); ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr); -ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); -ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags); -ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); +ZMQ_EXPORT ssize_t zmq_send (void *s, const void *buf, size_t len, int flags); +ZMQ_EXPORT ssize_t zmq_send_const (void *s, const void *buf, size_t len, int flags); +ZMQ_EXPORT ssize_t zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); diff --git a/src/zmq.cpp b/src/zmq.cpp index cffcc2a5..a1510380 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -329,37 +329,38 @@ int zmq_disconnect (void *s_, const char *addr_) // Sending functions. -static int +static ssize_t s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { - int sz = (int) zmq_msg_size (msg_); + size_t nbytes = zmq_msg_size (msg_); int rc = s_->send ((zmq::msg_t*) msg_, flags_); if (unlikely (rc < 0)) return -1; - return sz; + return nbytes; } /* To be deprecated once zmq_msg_send() is stable */ int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_) { - return zmq_msg_send (msg_, s_, flags_); + // reproduce behaviour of truncated int in deprecated API + return (int) zmq_msg_send (msg_, s_, flags_); } -int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) +ssize_t zmq_send (void *s_, const void *buf_, size_t len_, int flags_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { errno = ENOTSOCK; return -1; } zmq_msg_t msg; - int rc = zmq_msg_init_size (&msg, len_); - if (rc != 0) + int rc1 = zmq_msg_init_size (&msg, len_); + if (rc1 != 0) return -1; memcpy (zmq_msg_data (&msg), buf_, len_); zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - rc = s_sendmsg (s, &msg, flags_); - if (unlikely (rc < 0)) { + ssize_t nbytes = s_sendmsg (s, &msg, flags_); + if (unlikely (nbytes < 0)) { int err = errno; int rc2 = zmq_msg_close (&msg); errno_assert (rc2 == 0); @@ -369,23 +370,23 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) // Note the optimisation here. We don't close the msg object as it is // empty anyway. This may change when implementation of zmq_msg_t changes. - return rc; + return nbytes; } -int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) +ssize_t zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { errno = ENOTSOCK; return -1; } zmq_msg_t msg; - int rc = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL); - if (rc != 0) + int rc1 = zmq_msg_init_data (&msg, (void*)buf_, len_, NULL, NULL); + if (rc1 != 0) return -1; zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - rc = s_sendmsg (s, &msg, flags_); - if (unlikely (rc < 0)) { + ssize_t nbytes = s_sendmsg (s, &msg, flags_); + if (unlikely (nbytes < 0)) { int err = errno; int rc2 = zmq_msg_close (&msg); errno_assert (rc2 == 0); @@ -395,7 +396,7 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) // Note the optimisation here. We don't close the msg object as it is // empty anyway. This may change when implementation of zmq_msg_t changes. - return rc; + return nbytes; } @@ -425,8 +426,8 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) memcpy (zmq_msg_data (&msg), a_[i].iov_base, a_[i].iov_len); if (i == count_ - 1) flags_ = flags_ & ~ZMQ_SNDMORE; - rc = s_sendmsg (s, &msg, flags_); - if (unlikely (rc < 0)) { + ssize_t nbytes = s_sendmsg (s, &msg, flags_); + if (unlikely (nbytes < 0)) { int err = errno; int rc2 = zmq_msg_close (&msg); errno_assert (rc2 == 0); @@ -440,23 +441,24 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) // Receiving functions. -static int +static ssize_t s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { int rc = s_->recv ((zmq::msg_t*) msg_, flags_); if (unlikely (rc < 0)) return -1; - return (int) zmq_msg_size (msg_); + return zmq_msg_size (msg_); } /* To be deprecated once zmq_msg_recv() is stable */ int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_) { - return zmq_msg_recv (msg_, s_, flags_); + // reproduce behaviour of truncated int in deprecated API + return (int) zmq_msg_recv (msg_, s_, flags_); } -int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) +ssize_t zmq_recv (void *s_, void *buf_, size_t len_, int flags_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { errno = ENOTSOCK; @@ -467,7 +469,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) errno_assert (rc == 0); zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - int nbytes = s_recvmsg (s, &msg, flags_); + ssize_t nbytes = s_recvmsg (s, &msg, flags_); if (unlikely (nbytes < 0)) { int err = errno; rc = zmq_msg_close (&msg); @@ -476,9 +478,7 @@ int zmq_recv (void *s_, void *buf_, size_t len_, int flags_) return -1; } - // At the moment an oversized message is silently truncated. - // TODO: Build in a notification mechanism to report the overflows. - size_t to_copy = size_t (nbytes) < len_ ? size_t (nbytes) : len_; + ssize_t to_copy = (size_t) nbytes < len_ ? nbytes : len_; memcpy (buf_, zmq_msg_data (&msg), to_copy); rc = zmq_msg_close (&msg); @@ -523,7 +523,7 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) int rc = zmq_msg_init (&msg); errno_assert (rc == 0); - int nbytes = s_recvmsg (s, &msg, flags_); + ssize_t nbytes = s_recvmsg (s, &msg, flags_); if (unlikely (nbytes < 0)) { int err = errno; rc = zmq_msg_close (&msg); @@ -569,26 +569,26 @@ int zmq_msg_init_data (zmq_msg_t *msg_, void *data_, size_t size_, return ((zmq::msg_t*) msg_)->init_data (data_, size_, ffn_, hint_); } -int zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_) +ssize_t zmq_msg_send (zmq_msg_t *msg_, void *s_, int flags_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { errno = ENOTSOCK; return -1; } zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - int result = s_sendmsg (s, msg_, flags_); - return result; + ssize_t nbytes = s_sendmsg (s, msg_, flags_); + return nbytes; } -int zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_) +ssize_t zmq_msg_recv (zmq_msg_t *msg_, void *s_, int flags_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { errno = ENOTSOCK; return -1; } zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - int result = s_recvmsg (s, msg_, flags_); - return result; + ssize_t nbytes = s_recvmsg (s, msg_, flags_); + return nbytes; } int zmq_msg_close (zmq_msg_t *msg_)