mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-30 13:47:13 +01:00
Merge pull request #3145 from sigiesec/analyze
More code style fixes and improvements to static analysis configuration
This commit is contained in:
@@ -9,6 +9,7 @@ Checks: "*,\
|
|||||||
-fuchsia-default-arguments,\
|
-fuchsia-default-arguments,\
|
||||||
-google-readability-todo,\
|
-google-readability-todo,\
|
||||||
-cppcoreguidelines-pro-type-member-init,\
|
-cppcoreguidelines-pro-type-member-init,\
|
||||||
|
-cppcoreguidelines-pro-type-static-cast-downcast,\
|
||||||
# not currently a coding convention, C++11-specific, but conceivable,\
|
# not currently a coding convention, C++11-specific, but conceivable,\
|
||||||
-modernize-use-nullptr,\
|
-modernize-use-nullptr,\
|
||||||
-modernize-use-equals-default,\
|
-modernize-use-equals-default,\
|
||||||
@@ -27,6 +28,7 @@ Checks: "*,\
|
|||||||
# not easily possible to implement (maybe replace by specific exclusions),\
|
# not easily possible to implement (maybe replace by specific exclusions),\
|
||||||
-cppcoreguidelines-pro-type-vararg,\
|
-cppcoreguidelines-pro-type-vararg,\
|
||||||
-cppcoreguidelines-pro-type-reinterpret-cast,\
|
-cppcoreguidelines-pro-type-reinterpret-cast,\
|
||||||
|
-hicpp-signed-bitwise,\
|
||||||
# duplicates,\
|
# duplicates,\
|
||||||
-google-readability-braces-around-statements,\
|
-google-readability-braces-around-statements,\
|
||||||
-cppcoreguidelines-pro-type-cstyle-cast,\
|
-cppcoreguidelines-pro-type-cstyle-cast,\
|
||||||
@@ -38,7 +40,7 @@ Checks: "*,\
|
|||||||
-hicpp-use-auto,\
|
-hicpp-use-auto,\
|
||||||
-hicpp-use-nullptr,\
|
-hicpp-use-nullptr,\
|
||||||
-hicpp-no-array-decay,\
|
-hicpp-no-array-decay,\
|
||||||
-hicpp-member-init"
|
-hicpp-member-init“
|
||||||
WarningsAsErrors: ''
|
WarningsAsErrors: ''
|
||||||
HeaderFilterRegex: ''
|
HeaderFilterRegex: ''
|
||||||
# AnalyzeTemporaryDtors: false
|
# AnalyzeTemporaryDtors: false
|
||||||
|
|||||||
@@ -415,6 +415,7 @@ if (CMAKE_SYSTEM_NAME MATCHES "SunOS" OR CMAKE_SYSTEM_NAME MATCHES "NetBSD")
|
|||||||
endif ()
|
endif ()
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
|
zmq_check_noexcept ()
|
||||||
|
|
||||||
#-----------------------------------------------------------------------------
|
#-----------------------------------------------------------------------------
|
||||||
if (NOT CMAKE_CROSSCOMPILING AND NOT MSVC)
|
if (NOT CMAKE_CROSSCOMPILING AND NOT MSVC)
|
||||||
|
|||||||
@@ -293,3 +293,21 @@ int main (int argc, char *argv [])
|
|||||||
"
|
"
|
||||||
ZMQ_HAVE_GETRANDOM)
|
ZMQ_HAVE_GETRANDOM)
|
||||||
endmacro()
|
endmacro()
|
||||||
|
|
||||||
|
macro(zmq_check_noexcept)
|
||||||
|
message(STATUS "Checking whether noexcept is supported")
|
||||||
|
check_cxx_source_compiles(
|
||||||
|
"
|
||||||
|
struct X
|
||||||
|
{
|
||||||
|
X(int i) noexcept {}
|
||||||
|
};
|
||||||
|
|
||||||
|
int main(int argc, char *argv [])
|
||||||
|
{
|
||||||
|
X x(5);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
"
|
||||||
|
ZMQ_HAVE_NOEXCEPT)
|
||||||
|
endmacro()
|
||||||
|
|||||||
@@ -19,6 +19,8 @@
|
|||||||
#cmakedefine HAVE_MKDTEMP
|
#cmakedefine HAVE_MKDTEMP
|
||||||
#cmakedefine ZMQ_HAVE_UIO
|
#cmakedefine ZMQ_HAVE_UIO
|
||||||
|
|
||||||
|
#cmakedefine ZMQ_HAVE_NOEXCEPT
|
||||||
|
|
||||||
#cmakedefine ZMQ_HAVE_EVENTFD
|
#cmakedefine ZMQ_HAVE_EVENTFD
|
||||||
#cmakedefine ZMQ_HAVE_EVENTFD_CLOEXEC
|
#cmakedefine ZMQ_HAVE_EVENTFD_CLOEXEC
|
||||||
#cmakedefine ZMQ_HAVE_IFADDRS
|
#cmakedefine ZMQ_HAVE_IFADDRS
|
||||||
|
|||||||
@@ -51,73 +51,52 @@ zmq::address_t::address_t (const std::string &protocol_,
|
|||||||
address (address_),
|
address (address_),
|
||||||
parent (parent_)
|
parent (parent_)
|
||||||
{
|
{
|
||||||
memset (&resolved, 0, sizeof resolved);
|
resolved.dummy = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::address_t::~address_t ()
|
zmq::address_t::~address_t ()
|
||||||
{
|
{
|
||||||
if (protocol == "tcp") {
|
if (protocol == protocol_name::tcp) {
|
||||||
if (resolved.tcp_addr) {
|
LIBZMQ_DELETE (resolved.tcp_addr);
|
||||||
LIBZMQ_DELETE (resolved.tcp_addr);
|
} else if (protocol == protocol_name::udp) {
|
||||||
}
|
LIBZMQ_DELETE (resolved.udp_addr);
|
||||||
}
|
|
||||||
if (protocol == "udp") {
|
|
||||||
if (resolved.udp_addr) {
|
|
||||||
LIBZMQ_DELETE (resolved.udp_addr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
else if (protocol == "ipc") {
|
else if (protocol == protocol_name::ipc) {
|
||||||
if (resolved.ipc_addr) {
|
LIBZMQ_DELETE (resolved.ipc_addr);
|
||||||
LIBZMQ_DELETE (resolved.ipc_addr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_TIPC
|
#if defined ZMQ_HAVE_TIPC
|
||||||
else if (protocol == "tipc") {
|
else if (protocol == protocol_name::tipc) {
|
||||||
if (resolved.tipc_addr) {
|
LIBZMQ_DELETE (resolved.tipc_addr);
|
||||||
LIBZMQ_DELETE (resolved.tipc_addr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
else if (protocol == "vmci") {
|
else if (protocol == protocol_name::vmci) {
|
||||||
if (resolved.vmci_addr) {
|
LIBZMQ_DELETE (resolved.vmci_addr);
|
||||||
LIBZMQ_DELETE (resolved.vmci_addr);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::address_t::to_string (std::string &addr_) const
|
int zmq::address_t::to_string (std::string &addr_) const
|
||||||
{
|
{
|
||||||
if (protocol == "tcp") {
|
if (protocol == protocol_name::tcp && resolved.tcp_addr)
|
||||||
if (resolved.tcp_addr)
|
return resolved.tcp_addr->to_string (addr_);
|
||||||
return resolved.tcp_addr->to_string (addr_);
|
if (protocol == protocol_name::udp && resolved.udp_addr)
|
||||||
}
|
return resolved.udp_addr->to_string (addr_);
|
||||||
if (protocol == "udp") {
|
|
||||||
if (resolved.udp_addr)
|
|
||||||
return resolved.udp_addr->to_string (addr_);
|
|
||||||
}
|
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
else if (protocol == "ipc") {
|
if (protocol == protocol_name::ipc && resolved.ipc_addr)
|
||||||
if (resolved.ipc_addr)
|
return resolved.ipc_addr->to_string (addr_);
|
||||||
return resolved.ipc_addr->to_string (addr_);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_TIPC
|
#if defined ZMQ_HAVE_TIPC
|
||||||
else if (protocol == "tipc") {
|
if (protocol == protocol_name::tipc && resolved.tipc_addr)
|
||||||
if (resolved.tipc_addr)
|
return resolved.tipc_addr->to_string (addr_);
|
||||||
return resolved.tipc_addr->to_string (addr_);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
else if (protocol == "vmci") {
|
if (protocol == protocol_name::vmci && resolved.vmci_addr)
|
||||||
if (resolved.vmci_addr)
|
return resolved.vmci_addr->to_string (addr_);
|
||||||
return resolved.vmci_addr->to_string (addr_);
|
|
||||||
}
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (!protocol.empty () && !address.empty ()) {
|
if (!protocol.empty () && !address.empty ()) {
|
||||||
|
|||||||
@@ -46,6 +46,23 @@ class tipc_address_t;
|
|||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
class vmci_address_t;
|
class vmci_address_t;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
namespace protocol_name
|
||||||
|
{
|
||||||
|
static const char tcp[] = "tcp";
|
||||||
|
static const char udp[] = "udp";
|
||||||
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
|
static const char ipc[] = "ipc";
|
||||||
|
#endif
|
||||||
|
#if defined ZMQ_HAVE_TIPC
|
||||||
|
static const char tipc[] = "tipc";
|
||||||
|
#endif
|
||||||
|
#if defined ZMQ_HAVE_VMCI
|
||||||
|
static const char vmci[] = "vmci";
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
struct address_t
|
struct address_t
|
||||||
{
|
{
|
||||||
address_t (const std::string &protocol_,
|
address_t (const std::string &protocol_,
|
||||||
@@ -59,8 +76,10 @@ struct address_t
|
|||||||
ctx_t *const parent;
|
ctx_t *const parent;
|
||||||
|
|
||||||
// Protocol specific resolved address
|
// Protocol specific resolved address
|
||||||
|
// All members must be pointers to allow for consistent initialization
|
||||||
union
|
union
|
||||||
{
|
{
|
||||||
|
void *dummy;
|
||||||
tcp_address_t *tcp_addr;
|
tcp_address_t *tcp_addr;
|
||||||
udp_address_t *udp_addr;
|
udp_address_t *udp_addr;
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
|
|||||||
@@ -66,6 +66,14 @@
|
|||||||
#include <arch/atomic.h>
|
#include <arch/atomic.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if !defined ZMQ_NOEXCEPT
|
||||||
|
#if defined ZMQ_HAVE_NOEXCEPT
|
||||||
|
#define ZMQ_NOEXCEPT noexcept
|
||||||
|
#else
|
||||||
|
#define ZMQ_NOEXCEPT
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
// This class represents an integer that can be incremented/decremented
|
// This class represents an integer that can be incremented/decremented
|
||||||
@@ -90,15 +98,16 @@ class atomic_counter_t
|
|||||||
public:
|
public:
|
||||||
typedef uint32_t integer_t;
|
typedef uint32_t integer_t;
|
||||||
|
|
||||||
inline atomic_counter_t (integer_t value_ = 0) : _value (value_) {}
|
inline atomic_counter_t (integer_t value_ = 0) ZMQ_NOEXCEPT
|
||||||
|
: _value (value_)
|
||||||
inline ~atomic_counter_t () {}
|
{
|
||||||
|
}
|
||||||
|
|
||||||
// Set counter _value (not thread-safe).
|
// Set counter _value (not thread-safe).
|
||||||
inline void set (integer_t value_) { _value = value_; }
|
inline void set (integer_t value_) ZMQ_NOEXCEPT { _value = value_; }
|
||||||
|
|
||||||
// Atomic addition. Returns the old _value.
|
// Atomic addition. Returns the old _value.
|
||||||
inline integer_t add (integer_t increment_)
|
inline integer_t add (integer_t increment_) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
integer_t old_value;
|
integer_t old_value;
|
||||||
|
|
||||||
@@ -143,7 +152,7 @@ class atomic_counter_t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Atomic subtraction. Returns false if the counter drops to zero.
|
// Atomic subtraction. Returns false if the counter drops to zero.
|
||||||
inline bool sub (integer_t decrement_)
|
inline bool sub (integer_t decrement_) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_COUNTER_WINDOWS
|
#if defined ZMQ_ATOMIC_COUNTER_WINDOWS
|
||||||
LONG delta = -((LONG) decrement_);
|
LONG delta = -((LONG) decrement_);
|
||||||
@@ -198,7 +207,7 @@ class atomic_counter_t
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
inline integer_t get () const { return _value; }
|
inline integer_t get () const ZMQ_NOEXCEPT { return _value; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
#if defined ZMQ_ATOMIC_COUNTER_CXX11
|
#if defined ZMQ_ATOMIC_COUNTER_CXX11
|
||||||
|
|||||||
@@ -64,6 +64,14 @@
|
|||||||
#include <arch/atomic.h>
|
#include <arch/atomic.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if !defined ZMQ_NOEXCEPT
|
||||||
|
#if defined ZMQ_HAVE_NOEXCEPT
|
||||||
|
#define ZMQ_NOEXCEPT noexcept
|
||||||
|
#else
|
||||||
|
#define ZMQ_NOEXCEPT
|
||||||
|
#endif
|
||||||
|
#endif
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
#if !defined ZMQ_ATOMIC_PTR_CXX11
|
#if !defined ZMQ_ATOMIC_PTR_CXX11
|
||||||
@@ -73,7 +81,7 @@ inline void *atomic_xchg_ptr (void **ptr_,
|
|||||||
,
|
,
|
||||||
mutex_t &_sync
|
mutex_t &_sync
|
||||||
#endif
|
#endif
|
||||||
)
|
) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_PTR_WINDOWS
|
#if defined ZMQ_ATOMIC_PTR_WINDOWS
|
||||||
return InterlockedExchangePointer ((PVOID *) ptr_, val_);
|
return InterlockedExchangePointer ((PVOID *) ptr_, val_);
|
||||||
@@ -120,7 +128,7 @@ inline void *atomic_cas (void *volatile *ptr_,
|
|||||||
,
|
,
|
||||||
mutex_t &_sync
|
mutex_t &_sync
|
||||||
#endif
|
#endif
|
||||||
)
|
) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_PTR_WINDOWS
|
#if defined ZMQ_ATOMIC_PTR_WINDOWS
|
||||||
return InterlockedCompareExchangePointer ((volatile PVOID *) ptr_, val_,
|
return InterlockedCompareExchangePointer ((volatile PVOID *) ptr_, val_,
|
||||||
@@ -176,19 +184,16 @@ template <typename T> class atomic_ptr_t
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
// Initialise atomic pointer
|
// Initialise atomic pointer
|
||||||
inline atomic_ptr_t () { _ptr = NULL; }
|
inline atomic_ptr_t () ZMQ_NOEXCEPT { _ptr = NULL; }
|
||||||
|
|
||||||
// Destroy atomic pointer
|
|
||||||
inline ~atomic_ptr_t () {}
|
|
||||||
|
|
||||||
// Set value of atomic pointer in a non-threadsafe way
|
// Set value of atomic pointer in a non-threadsafe way
|
||||||
// Use this function only when you are sure that at most one
|
// Use this function only when you are sure that at most one
|
||||||
// thread is accessing the pointer at the moment.
|
// thread is accessing the pointer at the moment.
|
||||||
inline void set (T *ptr_) { _ptr = ptr_; }
|
inline void set (T *ptr_) ZMQ_NOEXCEPT { _ptr = ptr_; }
|
||||||
|
|
||||||
// Perform atomic 'exchange pointers' operation. Pointer is set
|
// Perform atomic 'exchange pointers' operation. Pointer is set
|
||||||
// to the 'val_' value. Old value is returned.
|
// to the 'val_' value. Old value is returned.
|
||||||
inline T *xchg (T *val_)
|
inline T *xchg (T *val_) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_PTR_CXX11
|
#if defined ZMQ_ATOMIC_PTR_CXX11
|
||||||
return _ptr.exchange (val_, std::memory_order_acq_rel);
|
return _ptr.exchange (val_, std::memory_order_acq_rel);
|
||||||
@@ -206,7 +211,7 @@ template <typename T> class atomic_ptr_t
|
|||||||
// The pointer is compared to 'cmp' argument and if they are
|
// The pointer is compared to 'cmp' argument and if they are
|
||||||
// equal, its value is set to 'val_'. Old value of the pointer
|
// equal, its value is set to 'val_'. Old value of the pointer
|
||||||
// is returned.
|
// is returned.
|
||||||
inline T *cas (T *cmp_, T *val_)
|
inline T *cas (T *cmp_, T *val_) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_PTR_CXX11
|
#if defined ZMQ_ATOMIC_PTR_CXX11
|
||||||
_ptr.compare_exchange_strong (cmp_, val_, std::memory_order_acq_rel);
|
_ptr.compare_exchange_strong (cmp_, val_, std::memory_order_acq_rel);
|
||||||
@@ -240,11 +245,14 @@ template <typename T> class atomic_ptr_t
|
|||||||
|
|
||||||
struct atomic_value_t
|
struct atomic_value_t
|
||||||
{
|
{
|
||||||
atomic_value_t (const int value_) : _value (value_) {}
|
atomic_value_t (const int value_) ZMQ_NOEXCEPT : _value (value_) {}
|
||||||
|
|
||||||
atomic_value_t (const atomic_value_t &src_) : _value (src_.load ()) {}
|
atomic_value_t (const atomic_value_t &src_) ZMQ_NOEXCEPT
|
||||||
|
: _value (src_.load ())
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
void store (const int value_)
|
void store (const int value_) ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_PTR_CXX11
|
#if defined ZMQ_ATOMIC_PTR_CXX11
|
||||||
_value.store (value_, std::memory_order_release);
|
_value.store (value_, std::memory_order_release);
|
||||||
@@ -258,7 +266,7 @@ struct atomic_value_t
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int load () const
|
int load () const ZMQ_NOEXCEPT
|
||||||
{
|
{
|
||||||
#if defined ZMQ_ATOMIC_PTR_CXX11
|
#if defined ZMQ_ATOMIC_PTR_CXX11
|
||||||
return _value.load (std::memory_order_acquire);
|
return _value.load (std::memory_order_acquire);
|
||||||
|
|||||||
@@ -54,14 +54,14 @@ namespace zmq
|
|||||||
template <typename T> class encoder_base_t : public i_encoder
|
template <typename T> class encoder_base_t : public i_encoder
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
inline encoder_base_t (size_t bufsize_) :
|
inline explicit encoder_base_t (size_t bufsize_) :
|
||||||
_write_pos (0),
|
_write_pos (0),
|
||||||
_to_write (0),
|
_to_write (0),
|
||||||
_next (NULL),
|
_next (NULL),
|
||||||
_new_msg_flag (false),
|
_new_msg_flag (false),
|
||||||
_buf_size (bufsize_),
|
_buf_size (bufsize_),
|
||||||
_buf (static_cast<unsigned char *> (malloc (bufsize_))),
|
_buf (static_cast<unsigned char *> (malloc (bufsize_))),
|
||||||
in_progress (NULL)
|
_in_progress (NULL)
|
||||||
{
|
{
|
||||||
alloc_assert (_buf);
|
alloc_assert (_buf);
|
||||||
}
|
}
|
||||||
@@ -78,7 +78,7 @@ template <typename T> class encoder_base_t : public i_encoder
|
|||||||
unsigned char *buffer = !*data_ ? _buf : *data_;
|
unsigned char *buffer = !*data_ ? _buf : *data_;
|
||||||
size_t buffersize = !*data_ ? _buf_size : size_;
|
size_t buffersize = !*data_ ? _buf_size : size_;
|
||||||
|
|
||||||
if (in_progress == NULL)
|
if (in_progress () == NULL)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
size_t pos = 0;
|
size_t pos = 0;
|
||||||
@@ -88,11 +88,11 @@ template <typename T> class encoder_base_t : public i_encoder
|
|||||||
// in the buffer.
|
// in the buffer.
|
||||||
if (!_to_write) {
|
if (!_to_write) {
|
||||||
if (_new_msg_flag) {
|
if (_new_msg_flag) {
|
||||||
int rc = in_progress->close ();
|
int rc = _in_progress->close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
rc = in_progress->init ();
|
rc = _in_progress->init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
in_progress = NULL;
|
_in_progress = NULL;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
(static_cast<T *> (this)->*_next) ();
|
(static_cast<T *> (this)->*_next) ();
|
||||||
@@ -130,8 +130,8 @@ template <typename T> class encoder_base_t : public i_encoder
|
|||||||
|
|
||||||
void load_msg (msg_t *msg_)
|
void load_msg (msg_t *msg_)
|
||||||
{
|
{
|
||||||
zmq_assert (in_progress == NULL);
|
zmq_assert (in_progress () == NULL);
|
||||||
in_progress = msg_;
|
_in_progress = msg_;
|
||||||
(static_cast<T *> (this)->*_next) ();
|
(static_cast<T *> (this)->*_next) ();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -152,6 +152,8 @@ template <typename T> class encoder_base_t : public i_encoder
|
|||||||
_new_msg_flag = new_msg_flag_;
|
_new_msg_flag = new_msg_flag_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msg_t *in_progress () { return _in_progress; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Where to get the data to write from.
|
// Where to get the data to write from.
|
||||||
unsigned char *_write_pos;
|
unsigned char *_write_pos;
|
||||||
@@ -172,8 +174,7 @@ template <typename T> class encoder_base_t : public i_encoder
|
|||||||
encoder_base_t (const encoder_base_t &);
|
encoder_base_t (const encoder_base_t &);
|
||||||
void operator= (const encoder_base_t &);
|
void operator= (const encoder_base_t &);
|
||||||
|
|
||||||
protected:
|
msg_t *_in_progress;
|
||||||
msg_t *in_progress;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ class io_object_t : public i_poll_events
|
|||||||
void reset_pollin (handle_t handle_);
|
void reset_pollin (handle_t handle_);
|
||||||
void set_pollout (handle_t handle_);
|
void set_pollout (handle_t handle_);
|
||||||
void reset_pollout (handle_t handle_);
|
void reset_pollout (handle_t handle_);
|
||||||
void add_timer (int timout_, int id_);
|
void add_timer (int timeout_, int id_);
|
||||||
void cancel_timer (int id_);
|
void cancel_timer (int id_);
|
||||||
|
|
||||||
// i_poll_events interface implementation.
|
// i_poll_events interface implementation.
|
||||||
|
|||||||
@@ -667,14 +667,14 @@ void zmq::make_socket_noninheritable (fd_t sock_)
|
|||||||
const BOOL brc = SetHandleInformation (reinterpret_cast<HANDLE> (sock_),
|
const BOOL brc = SetHandleInformation (reinterpret_cast<HANDLE> (sock_),
|
||||||
HANDLE_FLAG_INHERIT, 0);
|
HANDLE_FLAG_INHERIT, 0);
|
||||||
win_assert (brc);
|
win_assert (brc);
|
||||||
#endif
|
#elif (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4) \
|
||||||
|
|
||||||
#if (!defined ZMQ_HAVE_SOCK_CLOEXEC || !defined HAVE_ACCEPT4) \
|
|
||||||
&& defined FD_CLOEXEC
|
&& defined FD_CLOEXEC
|
||||||
// If there 's no SOCK_CLOEXEC, let's try the second best option.
|
// If there 's no SOCK_CLOEXEC, let's try the second best option.
|
||||||
// Race condition can cause socket not to be closed (if fork happens
|
// Race condition can cause socket not to be closed (if fork happens
|
||||||
// between accept and this point).
|
// between accept and this point).
|
||||||
const int rc = fcntl (sock_, F_SETFD, FD_CLOEXEC);
|
const int rc = fcntl (sock_, F_SETFD, FD_CLOEXEC);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
#else
|
||||||
|
LIBZMQ_UNUSED (sock_);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,6 +88,17 @@ class ip_resolver_t
|
|||||||
int resolve (ip_addr_t *ip_addr_, const char *name_);
|
int resolve (ip_addr_t *ip_addr_, const char *name_);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
// Virtual functions that are overridden in tests
|
||||||
|
virtual int do_getaddrinfo (const char *node_,
|
||||||
|
const char *service_,
|
||||||
|
const struct addrinfo *hints_,
|
||||||
|
struct addrinfo **res_);
|
||||||
|
|
||||||
|
virtual void do_freeaddrinfo (struct addrinfo *res_);
|
||||||
|
|
||||||
|
virtual unsigned int do_if_nametoindex (const char *ifname_);
|
||||||
|
|
||||||
|
private:
|
||||||
ip_resolver_options_t _options;
|
ip_resolver_options_t _options;
|
||||||
|
|
||||||
int resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_);
|
int resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_);
|
||||||
@@ -97,16 +108,6 @@ class ip_resolver_t
|
|||||||
int get_interface_name (unsigned long index_, char **dest_) const;
|
int get_interface_name (unsigned long index_, char **dest_) const;
|
||||||
int wchar_to_utf8 (const WCHAR *src_, char **dest_) const;
|
int wchar_to_utf8 (const WCHAR *src_, char **dest_) const;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Virtual functions that are overriden in tests
|
|
||||||
virtual int do_getaddrinfo (const char *node_,
|
|
||||||
const char *service_,
|
|
||||||
const struct addrinfo *hints_,
|
|
||||||
struct addrinfo **res_);
|
|
||||||
|
|
||||||
virtual void do_freeaddrinfo (struct addrinfo *res_);
|
|
||||||
|
|
||||||
virtual unsigned int do_if_nametoindex (const char *ifname_);
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -43,13 +43,13 @@ zmq::ipc_address_t::ipc_address_t ()
|
|||||||
memset (&address, 0, sizeof address);
|
memset (&address, 0, sizeof address);
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::ipc_address_t::ipc_address_t (const sockaddr *sa, socklen_t sa_len)
|
zmq::ipc_address_t::ipc_address_t (const sockaddr *sa_, socklen_t sa_len_)
|
||||||
{
|
{
|
||||||
zmq_assert (sa && sa_len > 0);
|
zmq_assert (sa_ && sa_len_ > 0);
|
||||||
|
|
||||||
memset (&address, 0, sizeof address);
|
memset (&address, 0, sizeof address);
|
||||||
if (sa->sa_family == AF_UNIX)
|
if (sa_->sa_family == AF_UNIX)
|
||||||
memcpy (&address, sa, sa_len);
|
memcpy (&address, sa_, sa_len_);
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::ipc_address_t::~ipc_address_t ()
|
zmq::ipc_address_t::~ipc_address_t ()
|
||||||
|
|||||||
@@ -44,7 +44,7 @@ class ipc_address_t
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ipc_address_t ();
|
ipc_address_t ();
|
||||||
ipc_address_t (const sockaddr *sa, socklen_t sa_len);
|
ipc_address_t (const sockaddr *sa_, socklen_t sa_len_);
|
||||||
~ipc_address_t ();
|
~ipc_address_t ();
|
||||||
|
|
||||||
// This function sets up the address for UNIX domain transport.
|
// This function sets up the address for UNIX domain transport.
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
|
|||||||
current_reconnect_ivl (options.reconnect_ivl)
|
current_reconnect_ivl (options.reconnect_ivl)
|
||||||
{
|
{
|
||||||
zmq_assert (addr);
|
zmq_assert (addr);
|
||||||
zmq_assert (addr->protocol == "ipc");
|
zmq_assert (addr->protocol == protocol_name::ipc);
|
||||||
addr->to_string (endpoint);
|
addr->to_string (endpoint);
|
||||||
socket = session->get_socket ();
|
socket = session->get_socket ();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -60,12 +60,12 @@ void zmq::mechanism_t::peer_routing_id (msg_t *msg_)
|
|||||||
msg_->set_flags (msg_t::routing_id);
|
msg_->set_flags (msg_t::routing_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::mechanism_t::set_user_id (const void *data_, size_t size_)
|
void zmq::mechanism_t::set_user_id (const void *user_id_, size_t size_)
|
||||||
{
|
{
|
||||||
_user_id.set (static_cast<const unsigned char *> (data_), size_);
|
_user_id.set (static_cast<const unsigned char *> (user_id_), size_);
|
||||||
_zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE (
|
_zap_properties.ZMQ_MAP_INSERT_OR_EMPLACE (
|
||||||
std::string (ZMQ_MSG_PROPERTY_USER_ID),
|
std::string (ZMQ_MSG_PROPERTY_USER_ID),
|
||||||
std::string (reinterpret_cast<const char *> (data_), size_));
|
std::string (reinterpret_cast<const char *> (user_id_), size_));
|
||||||
}
|
}
|
||||||
|
|
||||||
const zmq::blob_t &zmq::mechanism_t::get_user_id () const
|
const zmq::blob_t &zmq::mechanism_t::get_user_id () const
|
||||||
@@ -159,20 +159,20 @@ size_t zmq::mechanism_t::property_len (const char *name_, size_t value_len_)
|
|||||||
#define ZMTP_PROPERTY_SOCKET_TYPE "Socket-Type"
|
#define ZMTP_PROPERTY_SOCKET_TYPE "Socket-Type"
|
||||||
#define ZMTP_PROPERTY_IDENTITY "Identity"
|
#define ZMTP_PROPERTY_IDENTITY "Identity"
|
||||||
|
|
||||||
size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf_,
|
size_t zmq::mechanism_t::add_basic_properties (unsigned char *ptr_,
|
||||||
size_t buf_capacity_) const
|
size_t ptr_capacity_) const
|
||||||
{
|
{
|
||||||
unsigned char *ptr = buf_;
|
unsigned char *ptr = ptr_;
|
||||||
|
|
||||||
// Add socket type property
|
// Add socket type property
|
||||||
const char *socket_type = socket_type_string (options.type);
|
const char *socket_type = socket_type_string (options.type);
|
||||||
ptr += add_property (ptr, buf_capacity_, ZMTP_PROPERTY_SOCKET_TYPE,
|
ptr += add_property (ptr, ptr_capacity_, ZMTP_PROPERTY_SOCKET_TYPE,
|
||||||
socket_type, strlen (socket_type));
|
socket_type, strlen (socket_type));
|
||||||
|
|
||||||
// Add identity (aka routing id) property
|
// Add identity (aka routing id) property
|
||||||
if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|
if (options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|
||||||
|| options.type == ZMQ_ROUTER) {
|
|| options.type == ZMQ_ROUTER) {
|
||||||
ptr += add_property (ptr, buf_capacity_ - (ptr - buf_),
|
ptr += add_property (ptr, ptr_capacity_ - (ptr - ptr_),
|
||||||
ZMTP_PROPERTY_IDENTITY, options.routing_id,
|
ZMTP_PROPERTY_IDENTITY, options.routing_id,
|
||||||
options.routing_id_size);
|
options.routing_id_size);
|
||||||
}
|
}
|
||||||
@@ -182,10 +182,10 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf_,
|
|||||||
options.app_metadata.begin ();
|
options.app_metadata.begin ();
|
||||||
it != options.app_metadata.end (); ++it)
|
it != options.app_metadata.end (); ++it)
|
||||||
ptr +=
|
ptr +=
|
||||||
add_property (ptr, buf_capacity_ - (ptr - buf_), it->first.c_str (),
|
add_property (ptr, ptr_capacity_ - (ptr - ptr_), it->first.c_str (),
|
||||||
it->second.c_str (), strlen (it->second.c_str ()));
|
it->second.c_str (), strlen (it->second.c_str ()));
|
||||||
|
|
||||||
return ptr - buf_;
|
return ptr - ptr_;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t zmq::mechanism_t::basic_properties_len () const
|
size_t zmq::mechanism_t::basic_properties_len () const
|
||||||
|
|||||||
11
src/msg.cpp
11
src/msg.cpp
@@ -59,19 +59,18 @@ int zmq::msg_t::init (void *data_,
|
|||||||
content_t *content_)
|
content_t *content_)
|
||||||
{
|
{
|
||||||
if (size_ < max_vsm_size) {
|
if (size_ < max_vsm_size) {
|
||||||
int const rc = init_size (size_);
|
const int rc = init_size (size_);
|
||||||
|
|
||||||
if (rc != -1) {
|
if (rc != -1) {
|
||||||
memcpy (data (), data_, size_);
|
memcpy (data (), data_, size_);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
} else if (content_) {
|
|
||||||
return init_external_storage (content_, data_, size_, ffn_, hint_);
|
|
||||||
} else {
|
|
||||||
return init_data (data_, size_, ffn_, hint_);
|
|
||||||
}
|
}
|
||||||
|
if (content_) {
|
||||||
|
return init_external_storage (content_, data_, size_, ffn_, hint_);
|
||||||
|
}
|
||||||
|
return init_data (data_, size_, ffn_, hint_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::msg_t::init ()
|
int zmq::msg_t::init ()
|
||||||
|
|||||||
@@ -183,16 +183,20 @@ int do_setsockopt_set (const void *const optval_,
|
|||||||
if (optvallen_ == 0 && optval_ == NULL) {
|
if (optvallen_ == 0 && optval_ == NULL) {
|
||||||
set_->clear ();
|
set_->clear ();
|
||||||
return 0;
|
return 0;
|
||||||
} else if (optvallen_ == sizeof (T) && optval_ != NULL) {
|
}
|
||||||
set_->insert (*((const T *) optval_));
|
if (optvallen_ == sizeof (T) && optval_ != NULL) {
|
||||||
|
set_->insert (*(static_cast<const T *> (optval_)));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return sockopt_invalid ();
|
return sockopt_invalid ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO why is 1000 a sensible default?
|
||||||
|
const int default_hwm = 1000;
|
||||||
|
|
||||||
zmq::options_t::options_t () :
|
zmq::options_t::options_t () :
|
||||||
sndhwm (1000),
|
sndhwm (default_hwm),
|
||||||
rcvhwm (1000),
|
rcvhwm (default_hwm),
|
||||||
affinity (0),
|
affinity (0),
|
||||||
routing_id_size (0),
|
routing_id_size (0),
|
||||||
rate (100),
|
rate (100),
|
||||||
@@ -321,7 +325,7 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
|
|
||||||
case ZMQ_ROUTING_ID:
|
case ZMQ_ROUTING_ID:
|
||||||
// Routing id is any binary string from 1 to 255 octets
|
// Routing id is any binary string from 1 to 255 octets
|
||||||
if (optvallen_ > 0 && optvallen_ < 256) {
|
if (optvallen_ > 0 && optvallen_ <= UCHAR_MAX) {
|
||||||
routing_id_size = static_cast<unsigned char> (optvallen_);
|
routing_id_size = static_cast<unsigned char> (optvallen_);
|
||||||
memcpy (routing_id, optval_, routing_id_size);
|
memcpy (routing_id, optval_, routing_id_size);
|
||||||
return 0;
|
return 0;
|
||||||
@@ -538,7 +542,8 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
if (optvallen_ == 0 && optval_ == NULL) {
|
if (optvallen_ == 0 && optval_ == NULL) {
|
||||||
mechanism = ZMQ_NULL;
|
mechanism = ZMQ_NULL;
|
||||||
return 0;
|
return 0;
|
||||||
} else if (optvallen_ > 0 && optvallen_ < 256 && optval_ != NULL) {
|
} else if (optvallen_ > 0 && optvallen_ <= UCHAR_MAX
|
||||||
|
&& optval_ != NULL) {
|
||||||
plain_username.assign (static_cast<const char *> (optval_),
|
plain_username.assign (static_cast<const char *> (optval_),
|
||||||
optvallen_);
|
optvallen_);
|
||||||
as_server = 0;
|
as_server = 0;
|
||||||
@@ -551,7 +556,8 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
if (optvallen_ == 0 && optval_ == NULL) {
|
if (optvallen_ == 0 && optval_ == NULL) {
|
||||||
mechanism = ZMQ_NULL;
|
mechanism = ZMQ_NULL;
|
||||||
return 0;
|
return 0;
|
||||||
} else if (optvallen_ > 0 && optvallen_ < 256 && optval_ != NULL) {
|
} else if (optvallen_ > 0 && optvallen_ <= UCHAR_MAX
|
||||||
|
&& optval_ != NULL) {
|
||||||
plain_password.assign (static_cast<const char *> (optval_),
|
plain_password.assign (static_cast<const char *> (optval_),
|
||||||
optvallen_);
|
optvallen_);
|
||||||
as_server = 0;
|
as_server = 0;
|
||||||
@@ -610,7 +616,7 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case ZMQ_GSSAPI_PRINCIPAL:
|
case ZMQ_GSSAPI_PRINCIPAL:
|
||||||
if (optvallen_ > 0 && optvallen_ < 256 && optval_ != NULL) {
|
if (optvallen_ > 0 && optvallen_ <= UCHAR_MAX && optval_ != NULL) {
|
||||||
gss_principal.assign ((const char *) optval_, optvallen_);
|
gss_principal.assign ((const char *) optval_, optvallen_);
|
||||||
mechanism = ZMQ_GSSAPI;
|
mechanism = ZMQ_GSSAPI;
|
||||||
return 0;
|
return 0;
|
||||||
@@ -618,7 +624,7 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case ZMQ_GSSAPI_SERVICE_PRINCIPAL:
|
case ZMQ_GSSAPI_SERVICE_PRINCIPAL:
|
||||||
if (optvallen_ > 0 && optvallen_ < 256 && optval_ != NULL) {
|
if (optvallen_ > 0 && optvallen_ <= UCHAR_MAX && optval_ != NULL) {
|
||||||
gss_service_principal.assign ((const char *) optval_,
|
gss_service_principal.assign ((const char *) optval_,
|
||||||
optvallen_);
|
optvallen_);
|
||||||
mechanism = ZMQ_GSSAPI;
|
mechanism = ZMQ_GSSAPI;
|
||||||
@@ -721,11 +727,11 @@ int zmq::options_t::setsockopt (int option_,
|
|||||||
|
|
||||||
case ZMQ_METADATA:
|
case ZMQ_METADATA:
|
||||||
if (optvallen_ > 0 && !is_int) {
|
if (optvallen_ > 0 && !is_int) {
|
||||||
const std::string s (reinterpret_cast<const char *> (optval_));
|
const std::string s (static_cast<const char *> (optval_));
|
||||||
const size_t pos = s.find (":");
|
const size_t pos = s.find (':');
|
||||||
if (pos != std::string::npos && pos != 0
|
if (pos != std::string::npos && pos != 0
|
||||||
&& pos != s.length () - 1) {
|
&& pos != s.length () - 1) {
|
||||||
std::string key = s.substr (0, pos);
|
const std::string key = s.substr (0, pos);
|
||||||
if (key.compare (0, 2, "X-") == 0
|
if (key.compare (0, 2, "X-") == 0
|
||||||
&& key.length () <= UCHAR_MAX) {
|
&& key.length () <= UCHAR_MAX) {
|
||||||
std::string val = s.substr (pos + 1, s.length ());
|
std::string val = s.substr (pos + 1, s.length ());
|
||||||
@@ -773,7 +779,7 @@ int zmq::options_t::getsockopt (int option_,
|
|||||||
void *optval_,
|
void *optval_,
|
||||||
size_t *optvallen_) const
|
size_t *optvallen_) const
|
||||||
{
|
{
|
||||||
bool is_int = (*optvallen_ == sizeof (int));
|
const bool is_int = (*optvallen_ == sizeof (int));
|
||||||
int *value = static_cast<int *> (optval_);
|
int *value = static_cast<int *> (optval_);
|
||||||
#if defined(ZMQ_ACT_MILITANT)
|
#if defined(ZMQ_ACT_MILITANT)
|
||||||
bool malformed = true; // Did caller pass a bad option value?
|
bool malformed = true; // Did caller pass a bad option value?
|
||||||
@@ -1156,9 +1162,3 @@ int zmq::options_t::getsockopt (int option_,
|
|||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::options_t::is_valid (int option_) const
|
|
||||||
{
|
|
||||||
LIBZMQ_UNUSED (option_);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -69,8 +69,6 @@ struct options_t
|
|||||||
int setsockopt (int option_, const void *optval_, size_t optvallen_);
|
int setsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
int getsockopt (int option_, void *optval_, size_t *optvallen_) const;
|
int getsockopt (int option_, void *optval_, size_t *optvallen_) const;
|
||||||
|
|
||||||
bool is_valid (int option_) const;
|
|
||||||
|
|
||||||
// High-water marks for message pipes.
|
// High-water marks for message pipes.
|
||||||
int sndhwm;
|
int sndhwm;
|
||||||
int rcvhwm;
|
int rcvhwm;
|
||||||
|
|||||||
@@ -45,6 +45,6 @@ zmq::raw_encoder_t::~raw_encoder_t ()
|
|||||||
|
|
||||||
void zmq::raw_encoder_t::raw_message_ready ()
|
void zmq::raw_encoder_t::raw_message_ready ()
|
||||||
{
|
{
|
||||||
next_step (in_progress->data (), in_progress->size (),
|
next_step (in_progress ()->data (), in_progress ()->size (),
|
||||||
&raw_encoder_t::raw_message_ready, true);
|
&raw_encoder_t::raw_message_ready, true);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -80,6 +80,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
|||||||
|
|
||||||
rc = pipe_->write (&probe_msg);
|
rc = pipe_->write (&probe_msg);
|
||||||
// zmq_assert (rc) is not applicable here, since it is not a bug.
|
// zmq_assert (rc) is not applicable here, since it is not a bug.
|
||||||
|
LIBZMQ_UNUSED (rc);
|
||||||
|
|
||||||
pipe_->flush ();
|
pipe_->flush ();
|
||||||
|
|
||||||
rc = probe_msg.close ();
|
rc = probe_msg.close ();
|
||||||
|
|||||||
@@ -515,7 +515,7 @@ void zmq::session_base_t::reconnect ()
|
|||||||
// and reestablish later on
|
// and reestablish later on
|
||||||
if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
|
if (_pipe && options.immediate == 1 && _addr->protocol != "pgm"
|
||||||
&& _addr->protocol != "epgm" && _addr->protocol != "norm"
|
&& _addr->protocol != "epgm" && _addr->protocol != "norm"
|
||||||
&& _addr->protocol != "udp") {
|
&& _addr->protocol != protocol_name::udp) {
|
||||||
_pipe->hiccup ();
|
_pipe->hiccup ();
|
||||||
_pipe->terminate (false);
|
_pipe->terminate (false);
|
||||||
_terminating_pipes.insert (_pipe);
|
_terminating_pipes.insert (_pipe);
|
||||||
@@ -557,10 +557,11 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
|
|
||||||
// Create the connecter object.
|
// Create the connecter object.
|
||||||
|
|
||||||
if (_addr->protocol == "tcp") {
|
if (_addr->protocol == protocol_name::tcp) {
|
||||||
if (!options.socks_proxy_address.empty ()) {
|
if (!options.socks_proxy_address.empty ()) {
|
||||||
address_t *proxy_address = new (std::nothrow)
|
address_t *proxy_address = new (std::nothrow)
|
||||||
address_t ("tcp", options.socks_proxy_address, this->get_ctx ());
|
address_t (protocol_name::tcp, options.socks_proxy_address,
|
||||||
|
this->get_ctx ());
|
||||||
alloc_assert (proxy_address);
|
alloc_assert (proxy_address);
|
||||||
socks_connecter_t *connecter = new (std::nothrow)
|
socks_connecter_t *connecter = new (std::nothrow)
|
||||||
socks_connecter_t (io_thread, this, options, _addr, proxy_address,
|
socks_connecter_t (io_thread, this, options, _addr, proxy_address,
|
||||||
@@ -578,7 +579,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
|
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
if (_addr->protocol == "ipc") {
|
if (_addr->protocol == protocol_name::ipc) {
|
||||||
ipc_connecter_t *connecter = new (std::nothrow)
|
ipc_connecter_t *connecter = new (std::nothrow)
|
||||||
ipc_connecter_t (io_thread, this, options, _addr, wait_);
|
ipc_connecter_t (io_thread, this, options, _addr, wait_);
|
||||||
alloc_assert (connecter);
|
alloc_assert (connecter);
|
||||||
@@ -587,7 +588,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_TIPC
|
#if defined ZMQ_HAVE_TIPC
|
||||||
if (_addr->protocol == "tipc") {
|
if (_addr->protocol == protocol_name::tipc) {
|
||||||
tipc_connecter_t *connecter = new (std::nothrow)
|
tipc_connecter_t *connecter = new (std::nothrow)
|
||||||
tipc_connecter_t (io_thread, this, options, _addr, wait_);
|
tipc_connecter_t (io_thread, this, options, _addr, wait_);
|
||||||
alloc_assert (connecter);
|
alloc_assert (connecter);
|
||||||
@@ -596,7 +597,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (_addr->protocol == "udp") {
|
if (_addr->protocol == protocol_name::udp) {
|
||||||
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
|
zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO
|
||||||
|| options.type == ZMQ_DGRAM);
|
|| options.type == ZMQ_DGRAM);
|
||||||
|
|
||||||
@@ -698,7 +699,7 @@ void zmq::session_base_t::start_connecting (bool wait_)
|
|||||||
#endif // ZMQ_HAVE_NORM
|
#endif // ZMQ_HAVE_NORM
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
if (_addr->protocol == "vmci") {
|
if (_addr->protocol == protocol_name::vmci) {
|
||||||
vmci_connecter_t *connecter = new (std::nothrow)
|
vmci_connecter_t *connecter = new (std::nothrow)
|
||||||
vmci_connecter_t (io_thread, this, options, _addr, wait_);
|
vmci_connecter_t (io_thread, this, options, _addr, wait_);
|
||||||
alloc_assert (connecter);
|
alloc_assert (connecter);
|
||||||
|
|||||||
@@ -250,12 +250,13 @@ int zmq::signaler_t::wait (int timeout_)
|
|||||||
if (unlikely (rc < 0)) {
|
if (unlikely (rc < 0)) {
|
||||||
errno_assert (errno == EINTR);
|
errno_assert (errno == EINTR);
|
||||||
return -1;
|
return -1;
|
||||||
} else if (unlikely (rc == 0)) {
|
}
|
||||||
|
if (unlikely (rc == 0)) {
|
||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#ifdef HAVE_FORK
|
#ifdef HAVE_FORK
|
||||||
else if (unlikely (pid != getpid ())) {
|
if (unlikely (pid != getpid ())) {
|
||||||
// we have forked and the file descriptor is closed. Emulate an interrupt
|
// we have forked and the file descriptor is closed. Emulate an interrupt
|
||||||
// response.
|
// response.
|
||||||
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
|
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
|
||||||
|
|||||||
@@ -295,9 +295,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
|||||||
if (protocol_ != "inproc"
|
if (protocol_ != "inproc"
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
&& protocol_ != "ipc"
|
&& protocol_ != protocol_name::ipc
|
||||||
#endif
|
#endif
|
||||||
&& protocol_ != "tcp"
|
&& protocol_ != protocol_name::tcp
|
||||||
#if defined ZMQ_HAVE_OPENPGM
|
#if defined ZMQ_HAVE_OPENPGM
|
||||||
// pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
|
// pgm/epgm transports only available if 0MQ is compiled with OpenPGM.
|
||||||
&& protocol_ != "pgm"
|
&& protocol_ != "pgm"
|
||||||
@@ -305,15 +305,15 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
|||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_TIPC
|
#if defined ZMQ_HAVE_TIPC
|
||||||
// TIPC transport is only available on Linux.
|
// TIPC transport is only available on Linux.
|
||||||
&& protocol_ != "tipc"
|
&& protocol_ != protocol_name::tipc
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_NORM
|
#if defined ZMQ_HAVE_NORM
|
||||||
&& protocol_ != "norm"
|
&& protocol_ != "norm"
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
&& protocol_ != "vmci"
|
&& protocol_ != protocol_name::vmci
|
||||||
#endif
|
#endif
|
||||||
&& protocol_ != "udp") {
|
&& protocol_ != protocol_name::udp) {
|
||||||
errno = EPROTONOSUPPORT;
|
errno = EPROTONOSUPPORT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -330,7 +330,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (protocol_ == "udp"
|
if (protocol_ == protocol_name::udp
|
||||||
&& (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
|
&& (options.type != ZMQ_DISH && options.type != ZMQ_RADIO
|
||||||
&& options.type != ZMQ_DGRAM)) {
|
&& options.type != ZMQ_DGRAM)) {
|
||||||
errno = ENOCOMPATPROTO;
|
errno = ENOCOMPATPROTO;
|
||||||
@@ -364,11 +364,6 @@ int zmq::socket_base_t::setsockopt (int option_,
|
|||||||
{
|
{
|
||||||
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
||||||
|
|
||||||
if (!options.is_valid (option_)) {
|
|
||||||
errno = EINVAL;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (unlikely (_ctx_terminated)) {
|
if (unlikely (_ctx_terminated)) {
|
||||||
errno = ETERM;
|
errno = ETERM;
|
||||||
return -1;
|
return -1;
|
||||||
@@ -516,7 +511,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (protocol == "udp") {
|
if (protocol == protocol_name::udp) {
|
||||||
if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
|
if (!(options.type == ZMQ_DGRAM || options.type == ZMQ_DISH)) {
|
||||||
errno = ENOCOMPATPROTO;
|
errno = ENOCOMPATPROTO;
|
||||||
return -1;
|
return -1;
|
||||||
@@ -580,7 +575,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (protocol == "tcp") {
|
if (protocol == protocol_name::tcp) {
|
||||||
tcp_listener_t *listener =
|
tcp_listener_t *listener =
|
||||||
new (std::nothrow) tcp_listener_t (io_thread, this, options);
|
new (std::nothrow) tcp_listener_t (io_thread, this, options);
|
||||||
alloc_assert (listener);
|
alloc_assert (listener);
|
||||||
@@ -601,7 +596,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
|
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
if (protocol == "ipc") {
|
if (protocol == protocol_name::ipc) {
|
||||||
ipc_listener_t *listener =
|
ipc_listener_t *listener =
|
||||||
new (std::nothrow) ipc_listener_t (io_thread, this, options);
|
new (std::nothrow) ipc_listener_t (io_thread, this, options);
|
||||||
alloc_assert (listener);
|
alloc_assert (listener);
|
||||||
@@ -621,7 +616,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_TIPC
|
#if defined ZMQ_HAVE_TIPC
|
||||||
if (protocol == "tipc") {
|
if (protocol == protocol_name::tipc) {
|
||||||
tipc_listener_t *listener =
|
tipc_listener_t *listener =
|
||||||
new (std::nothrow) tipc_listener_t (io_thread, this, options);
|
new (std::nothrow) tipc_listener_t (io_thread, this, options);
|
||||||
alloc_assert (listener);
|
alloc_assert (listener);
|
||||||
@@ -641,7 +636,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
if (protocol == "vmci") {
|
if (protocol == protocol_name::vmci) {
|
||||||
vmci_listener_t *listener =
|
vmci_listener_t *listener =
|
||||||
new (std::nothrow) vmci_listener_t (io_thread, this, options);
|
new (std::nothrow) vmci_listener_t (io_thread, this, options);
|
||||||
alloc_assert (listener);
|
alloc_assert (listener);
|
||||||
@@ -814,7 +809,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
alloc_assert (paddr);
|
alloc_assert (paddr);
|
||||||
|
|
||||||
// Resolve address (if needed by the protocol)
|
// Resolve address (if needed by the protocol)
|
||||||
if (protocol == "tcp") {
|
if (protocol == protocol_name::tcp) {
|
||||||
// Do some basic sanity checks on tcp:// address syntax
|
// Do some basic sanity checks on tcp:// address syntax
|
||||||
// - hostname starts with digit or letter, with embedded '-' or '.'
|
// - hostname starts with digit or letter, with embedded '-' or '.'
|
||||||
// - IPv6 address may contain hex chars and colons.
|
// - IPv6 address may contain hex chars and colons.
|
||||||
@@ -858,7 +853,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
}
|
}
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \
|
||||||
&& !defined ZMQ_HAVE_VXWORKS
|
&& !defined ZMQ_HAVE_VXWORKS
|
||||||
else if (protocol == "ipc") {
|
else if (protocol == protocol_name::ipc) {
|
||||||
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
|
paddr->resolved.ipc_addr = new (std::nothrow) ipc_address_t ();
|
||||||
alloc_assert (paddr->resolved.ipc_addr);
|
alloc_assert (paddr->resolved.ipc_addr);
|
||||||
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
|
int rc = paddr->resolved.ipc_addr->resolve (address.c_str ());
|
||||||
@@ -869,7 +864,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (protocol == "udp") {
|
if (protocol == protocol_name::udp) {
|
||||||
if (options.type != ZMQ_RADIO) {
|
if (options.type != ZMQ_RADIO) {
|
||||||
errno = ENOCOMPATPROTO;
|
errno = ENOCOMPATPROTO;
|
||||||
LIBZMQ_DELETE (paddr);
|
LIBZMQ_DELETE (paddr);
|
||||||
@@ -902,7 +897,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_TIPC
|
#if defined ZMQ_HAVE_TIPC
|
||||||
else if (protocol == "tipc") {
|
else if (protocol == protocol_name::tipc) {
|
||||||
paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
|
paddr->resolved.tipc_addr = new (std::nothrow) tipc_address_t ();
|
||||||
alloc_assert (paddr->resolved.tipc_addr);
|
alloc_assert (paddr->resolved.tipc_addr);
|
||||||
int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
|
int rc = paddr->resolved.tipc_addr->resolve (address.c_str ());
|
||||||
@@ -922,7 +917,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
else if (protocol == "vmci") {
|
else if (protocol == protocol_name::vmci) {
|
||||||
paddr->resolved.vmci_addr =
|
paddr->resolved.vmci_addr =
|
||||||
new (std::nothrow) vmci_address_t (this->get_ctx ());
|
new (std::nothrow) vmci_address_t (this->get_ctx ());
|
||||||
alloc_assert (paddr->resolved.vmci_addr);
|
alloc_assert (paddr->resolved.vmci_addr);
|
||||||
@@ -942,7 +937,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
// PGM does not support subscription forwarding; ask for all data to be
|
// PGM does not support subscription forwarding; ask for all data to be
|
||||||
// sent to this pipe. (same for NORM, currently?)
|
// sent to this pipe. (same for NORM, currently?)
|
||||||
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"
|
bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"
|
||||||
|| protocol == "norm" || protocol == "udp";
|
|| protocol == "norm"
|
||||||
|
|| protocol == protocol_name::udp;
|
||||||
pipe_t *newpipe = NULL;
|
pipe_t *newpipe = NULL;
|
||||||
|
|
||||||
if (options.immediate != 1 || subscribe_to_all) {
|
if (options.immediate != 1 || subscribe_to_all) {
|
||||||
@@ -1044,7 +1040,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
|
|||||||
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
|
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
|
||||||
// resolve before giving up. Given at this stage we don't know whether a
|
// resolve before giving up. Given at this stage we don't know whether a
|
||||||
// socket is connected or bound, try with both.
|
// socket is connected or bound, try with both.
|
||||||
if (protocol == "tcp") {
|
if (protocol == protocol_name::tcp) {
|
||||||
if (_endpoints.find (resolved_addr) == _endpoints.end ()) {
|
if (_endpoints.find (resolved_addr) == _endpoints.end ()) {
|
||||||
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
|
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
|
||||||
alloc_assert (tcp_addr);
|
alloc_assert (tcp_addr);
|
||||||
@@ -1581,7 +1577,7 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
|||||||
_rcvmore = (msg_->flags () & msg_t::more) != 0;
|
_rcvmore = (msg_->flags () & msg_t::more) != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
|
||||||
{
|
{
|
||||||
scoped_lock_t lock (_monitor_sync);
|
scoped_lock_t lock (_monitor_sync);
|
||||||
|
|
||||||
@@ -1591,14 +1587,14 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Support deregistering monitoring endpoints as well
|
// Support deregistering monitoring endpoints as well
|
||||||
if (addr_ == NULL) {
|
if (endpoint_ == NULL) {
|
||||||
stop_monitor ();
|
stop_monitor ();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// Parse addr_ string.
|
// Parse addr_ string.
|
||||||
std::string protocol;
|
std::string protocol;
|
||||||
std::string address;
|
std::string address;
|
||||||
if (parse_uri (addr_, protocol, address) || check_protocol (protocol))
|
if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// Event notification only supported over inproc://
|
// Event notification only supported over inproc://
|
||||||
@@ -1624,7 +1620,7 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
|
|||||||
stop_monitor (false);
|
stop_monitor (false);
|
||||||
|
|
||||||
// Spawn the monitor socket endpoint
|
// Spawn the monitor socket endpoint
|
||||||
rc = zmq_bind (_monitor_socket, addr_);
|
rc = zmq_bind (_monitor_socket, endpoint_);
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
stop_monitor (false);
|
stop_monitor (false);
|
||||||
return rc;
|
return rc;
|
||||||
@@ -1813,34 +1809,35 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id, pipe_t *pipe_)
|
void zmq::routing_socket_base_t::add_out_pipe (blob_t routing_id_,
|
||||||
|
pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Add the record into output pipes lookup table
|
// Add the record into output pipes lookup table
|
||||||
const out_pipe_t outpipe = {pipe_, true};
|
const out_pipe_t outpipe = {pipe_, true};
|
||||||
const bool ok =
|
const bool ok =
|
||||||
_out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe)
|
_out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id_), outpipe)
|
||||||
.second;
|
.second;
|
||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id) const
|
bool zmq::routing_socket_base_t::has_out_pipe (const blob_t &routing_id_) const
|
||||||
{
|
{
|
||||||
return 0 != _out_pipes.count (routing_id);
|
return 0 != _out_pipes.count (routing_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::routing_socket_base_t::out_pipe_t *
|
zmq::routing_socket_base_t::out_pipe_t *
|
||||||
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id)
|
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_)
|
||||||
{
|
{
|
||||||
// TODO we could probably avoid constructor a temporary blob_t to call this function
|
// TODO we could probably avoid constructor a temporary blob_t to call this function
|
||||||
out_pipes_t::iterator it = _out_pipes.find (routing_id);
|
out_pipes_t::iterator it = _out_pipes.find (routing_id_);
|
||||||
return it == _out_pipes.end () ? NULL : &it->second;
|
return it == _out_pipes.end () ? NULL : &it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
const zmq::routing_socket_base_t::out_pipe_t *
|
const zmq::routing_socket_base_t::out_pipe_t *
|
||||||
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id) const
|
zmq::routing_socket_base_t::lookup_out_pipe (const blob_t &routing_id_) const
|
||||||
{
|
{
|
||||||
// TODO we could probably avoid constructor a temporary blob_t to call this function
|
// TODO we could probably avoid constructor a temporary blob_t to call this function
|
||||||
out_pipes_t::const_iterator it = _out_pipes.find (routing_id);
|
out_pipes_t::const_iterator it = _out_pipes.find (routing_id_);
|
||||||
return it == _out_pipes.end () ? NULL : &it->second;
|
return it == _out_pipes.end () ? NULL : &it->second;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1851,9 +1848,9 @@ void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
zmq::routing_socket_base_t::out_pipe_t
|
zmq::routing_socket_base_t::out_pipe_t
|
||||||
zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id)
|
zmq::routing_socket_base_t::try_erase_out_pipe (const blob_t &routing_id_)
|
||||||
{
|
{
|
||||||
const out_pipes_t::iterator it = _out_pipes.find (routing_id);
|
const out_pipes_t::iterator it = _out_pipes.find (routing_id_);
|
||||||
out_pipe_t res = {NULL, false};
|
out_pipe_t res = {NULL, false};
|
||||||
if (it != _out_pipes.end ()) {
|
if (it != _out_pipes.end ()) {
|
||||||
res = it->second;
|
res = it->second;
|
||||||
|
|||||||
@@ -137,8 +137,8 @@ class socket_base_t : public own_t,
|
|||||||
|
|
||||||
// Query the state of a specific peer. The default implementation
|
// Query the state of a specific peer. The default implementation
|
||||||
// always returns an ENOTSUP error.
|
// always returns an ENOTSUP error.
|
||||||
virtual int get_peer_state (const void *identity_,
|
virtual int get_peer_state (const void *routing_id_,
|
||||||
size_t identity_size_) const;
|
size_t routing_id_size_) const;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
socket_base_t (zmq::ctx_t *parent_,
|
socket_base_t (zmq::ctx_t *parent_,
|
||||||
@@ -186,7 +186,7 @@ class socket_base_t : public own_t,
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
// test if event should be sent and then dispatch it
|
// test if event should be sent and then dispatch it
|
||||||
void event (const std::string &addr_, intptr_t fd_, int type_);
|
void event (const std::string &addr_, intptr_t value_, int type_);
|
||||||
|
|
||||||
// Socket event data dispatch
|
// Socket event data dispatch
|
||||||
void monitor_event (int event_, intptr_t value_, const std::string &addr_);
|
void monitor_event (int event_, intptr_t value_, const std::string &addr_);
|
||||||
@@ -318,19 +318,18 @@ class routing_socket_base_t : public socket_base_t
|
|||||||
bool active;
|
bool active;
|
||||||
};
|
};
|
||||||
|
|
||||||
void add_out_pipe (blob_t routing_id, pipe_t *pipe_);
|
void add_out_pipe (blob_t routing_id_, pipe_t *pipe_);
|
||||||
bool has_out_pipe (const blob_t &routing_id) const;
|
bool has_out_pipe (const blob_t &routing_id_) const;
|
||||||
out_pipe_t *lookup_out_pipe (const blob_t &routing_id);
|
out_pipe_t *lookup_out_pipe (const blob_t &routing_id_);
|
||||||
const out_pipe_t *lookup_out_pipe (const blob_t &routing_id) const;
|
const out_pipe_t *lookup_out_pipe (const blob_t &routing_id_) const;
|
||||||
void erase_out_pipe (pipe_t *pipe_);
|
void erase_out_pipe (pipe_t *pipe_);
|
||||||
out_pipe_t try_erase_out_pipe (const blob_t &routing_id);
|
out_pipe_t try_erase_out_pipe (const blob_t &routing_id_);
|
||||||
template <typename Func> bool any_of_out_pipes (Func func)
|
template <typename Func> bool any_of_out_pipes (Func func_)
|
||||||
{
|
{
|
||||||
bool res = false;
|
bool res = false;
|
||||||
for (out_pipes_t::iterator it = _out_pipes.begin ();
|
for (out_pipes_t::iterator it = _out_pipes.begin ();
|
||||||
it != _out_pipes.end (); ++it) {
|
it != _out_pipes.end () && !res; ++it) {
|
||||||
if (res |= func (*it->second.pipe))
|
res |= func_ (*it->second.pipe);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) : num_methods (1)
|
|||||||
methods[0] = method_;
|
methods[0] = method_;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::socks_greeting_t::socks_greeting_t (uint8_t *methods_,
|
zmq::socks_greeting_t::socks_greeting_t (const uint8_t *methods_,
|
||||||
uint8_t num_methods_) :
|
uint8_t num_methods_) :
|
||||||
num_methods (num_methods_)
|
num_methods (num_methods_)
|
||||||
{
|
{
|
||||||
@@ -273,8 +273,8 @@ bool zmq::socks_response_decoder_t::message_ready () const
|
|||||||
return _bytes_read == 10;
|
return _bytes_read == 10;
|
||||||
if (atyp == 0x03)
|
if (atyp == 0x03)
|
||||||
return _bytes_read > 4 && _bytes_read == 4 + 1 + _buf[4] + 2u;
|
return _bytes_read > 4 && _bytes_read == 4 + 1 + _buf[4] + 2u;
|
||||||
else
|
|
||||||
return _bytes_read == 22;
|
return _bytes_read == 22;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::socks_response_t zmq::socks_response_decoder_t::decode ()
|
zmq::socks_response_t zmq::socks_response_decoder_t::decode ()
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ namespace zmq
|
|||||||
struct socks_greeting_t
|
struct socks_greeting_t
|
||||||
{
|
{
|
||||||
socks_greeting_t (uint8_t method_);
|
socks_greeting_t (uint8_t method_);
|
||||||
socks_greeting_t (uint8_t *methods_, uint8_t num_methods_);
|
socks_greeting_t (const uint8_t *methods_, uint8_t num_methods_);
|
||||||
|
|
||||||
uint8_t methods[UINT8_MAX];
|
uint8_t methods[UINT8_MAX];
|
||||||
const size_t num_methods;
|
const size_t num_methods;
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
|
|||||||
_current_reconnect_ivl (options.reconnect_ivl)
|
_current_reconnect_ivl (options.reconnect_ivl)
|
||||||
{
|
{
|
||||||
zmq_assert (_addr);
|
zmq_assert (_addr);
|
||||||
zmq_assert (_addr->protocol == "tcp");
|
zmq_assert (_addr->protocol == protocol_name::tcp);
|
||||||
_proxy_addr->to_string (_endpoint);
|
_proxy_addr->to_string (_endpoint);
|
||||||
_socket = _session->get_socket ();
|
_socket = _session->get_socket ();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -71,10 +71,6 @@ zmq::tcp_address_t::tcp_address_t (const sockaddr *sa_, socklen_t sa_len_) :
|
|||||||
memcpy (&_address.ipv6, sa_, sizeof (_address.ipv6));
|
memcpy (&_address.ipv6, sa_, sizeof (_address.ipv6));
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::tcp_address_t::~tcp_address_t ()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_)
|
int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_)
|
||||||
{
|
{
|
||||||
// Test the ';' to know if we have a source address in name_
|
// Test the ';' to know if we have a source address in name_
|
||||||
@@ -117,7 +113,7 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_)
|
|||||||
return resolver.resolve (&_address, name_);
|
return resolver.resolve (&_address, name_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::tcp_address_t::to_string (std::string &addr_)
|
int zmq::tcp_address_t::to_string (std::string &addr_) const
|
||||||
{
|
{
|
||||||
if (_address.family () != AF_INET && _address.family () != AF_INET6) {
|
if (_address.family () != AF_INET && _address.family () != AF_INET6) {
|
||||||
addr_.clear ();
|
addr_.clear ();
|
||||||
@@ -127,8 +123,8 @@ int zmq::tcp_address_t::to_string (std::string &addr_)
|
|||||||
// Not using service resolving because of
|
// Not using service resolving because of
|
||||||
// https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
|
// https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
|
||||||
char hbuf[NI_MAXHOST];
|
char hbuf[NI_MAXHOST];
|
||||||
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0,
|
const int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL,
|
||||||
NI_NUMERICHOST);
|
0, NI_NUMERICHOST);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
addr_.clear ();
|
addr_.clear ();
|
||||||
return rc;
|
return rc;
|
||||||
@@ -148,28 +144,22 @@ int zmq::tcp_address_t::to_string (std::string &addr_)
|
|||||||
|
|
||||||
const sockaddr *zmq::tcp_address_t::addr () const
|
const sockaddr *zmq::tcp_address_t::addr () const
|
||||||
{
|
{
|
||||||
return &_address.generic;
|
return _address.as_sockaddr ();
|
||||||
}
|
}
|
||||||
|
|
||||||
socklen_t zmq::tcp_address_t::addrlen () const
|
socklen_t zmq::tcp_address_t::addrlen () const
|
||||||
{
|
{
|
||||||
if (_address.generic.sa_family == AF_INET6)
|
return _address.sockaddr_len ();
|
||||||
return static_cast<socklen_t> (sizeof (_address.ipv6));
|
|
||||||
|
|
||||||
return static_cast<socklen_t> (sizeof (_address.ipv4));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const sockaddr *zmq::tcp_address_t::src_addr () const
|
const sockaddr *zmq::tcp_address_t::src_addr () const
|
||||||
{
|
{
|
||||||
return &_source_address.generic;
|
return _source_address.as_sockaddr ();
|
||||||
}
|
}
|
||||||
|
|
||||||
socklen_t zmq::tcp_address_t::src_addrlen () const
|
socklen_t zmq::tcp_address_t::src_addrlen () const
|
||||||
{
|
{
|
||||||
if (_address.family () == AF_INET6)
|
return _source_address.sockaddr_len ();
|
||||||
return static_cast<socklen_t> (sizeof (_source_address.ipv6));
|
|
||||||
|
|
||||||
return static_cast<socklen_t> (sizeof (_source_address.ipv4));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::tcp_address_t::has_src_addr () const
|
bool zmq::tcp_address_t::has_src_addr () const
|
||||||
@@ -186,10 +176,9 @@ sa_family_t zmq::tcp_address_t::family () const
|
|||||||
return _address.family ();
|
return _address.family ();
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::tcp_address_mask_t::tcp_address_mask_t () :
|
zmq::tcp_address_mask_t::tcp_address_mask_t () : _address_mask (-1)
|
||||||
tcp_address_t (),
|
|
||||||
_address_mask (-1)
|
|
||||||
{
|
{
|
||||||
|
memset (&_network_address, 0, sizeof (_network_address));
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::tcp_address_mask_t::mask () const
|
int zmq::tcp_address_mask_t::mask () const
|
||||||
@@ -224,35 +213,39 @@ int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv6_)
|
|||||||
|
|
||||||
ip_resolver_t resolver (resolver_opts);
|
ip_resolver_t resolver (resolver_opts);
|
||||||
|
|
||||||
const int rc = resolver.resolve (&_address, addr_str.c_str ());
|
const int rc = resolver.resolve (&_network_address, addr_str.c_str ());
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
// Parse the cidr mask number.
|
// Parse the cidr mask number.
|
||||||
const int full_mask_ipv4 = sizeof (_address.ipv4.sin_addr) * CHAR_BIT;
|
const int full_mask_ipv4 =
|
||||||
const int full_mask_ipv6 = sizeof (_address.ipv6.sin6_addr) * CHAR_BIT;
|
sizeof (_network_address.ipv4.sin_addr) * CHAR_BIT;
|
||||||
|
const int full_mask_ipv6 =
|
||||||
|
sizeof (_network_address.ipv6.sin6_addr) * CHAR_BIT;
|
||||||
if (mask_str.empty ()) {
|
if (mask_str.empty ()) {
|
||||||
_address_mask =
|
_address_mask = _network_address.family () == AF_INET6 ? full_mask_ipv6
|
||||||
_address.family () == AF_INET6 ? full_mask_ipv6 : full_mask_ipv4;
|
: full_mask_ipv4;
|
||||||
} else if (mask_str == "0")
|
} else if (mask_str == "0")
|
||||||
_address_mask = 0;
|
_address_mask = 0;
|
||||||
else {
|
else {
|
||||||
const int mask = atoi (mask_str.c_str ());
|
const long mask = strtol (mask_str.c_str (), NULL, 10);
|
||||||
if ((mask < 1)
|
if ((mask < 1)
|
||||||
|| (_address.family () == AF_INET6 && mask > full_mask_ipv6)
|
|| (_network_address.family () == AF_INET6 && mask > full_mask_ipv6)
|
||||||
|| (_address.family () != AF_INET6 && mask > full_mask_ipv4)) {
|
|| (_network_address.family () != AF_INET6
|
||||||
|
&& mask > full_mask_ipv4)) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
_address_mask = mask;
|
_address_mask = static_cast<int> (mask);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::tcp_address_mask_t::to_string (std::string &addr_)
|
int zmq::tcp_address_mask_t::to_string (std::string &addr_) const
|
||||||
{
|
{
|
||||||
if (_address.family () != AF_INET && _address.family () != AF_INET6) {
|
if (_network_address.family () != AF_INET
|
||||||
|
&& _network_address.family () != AF_INET6) {
|
||||||
addr_.clear ();
|
addr_.clear ();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -262,14 +255,15 @@ int zmq::tcp_address_mask_t::to_string (std::string &addr_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
char hbuf[NI_MAXHOST];
|
char hbuf[NI_MAXHOST];
|
||||||
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0,
|
const int rc = getnameinfo (_network_address.as_sockaddr (),
|
||||||
NI_NUMERICHOST);
|
_network_address.sockaddr_len (), hbuf,
|
||||||
|
sizeof (hbuf), NULL, 0, NI_NUMERICHOST);
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
addr_.clear ();
|
addr_.clear ();
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (_address.family () == AF_INET6) {
|
if (_network_address.family () == AF_INET6) {
|
||||||
std::stringstream s;
|
std::stringstream s;
|
||||||
s << "[" << hbuf << "]/" << _address_mask;
|
s << "[" << hbuf << "]/" << _address_mask;
|
||||||
addr_ = s.str ();
|
addr_ = s.str ();
|
||||||
@@ -285,9 +279,10 @@ bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss_,
|
|||||||
const socklen_t ss_len_) const
|
const socklen_t ss_len_) const
|
||||||
{
|
{
|
||||||
zmq_assert (_address_mask != -1 && ss_ != NULL
|
zmq_assert (_address_mask != -1 && ss_ != NULL
|
||||||
&& ss_len_ >= (socklen_t) sizeof (struct sockaddr));
|
&& ss_len_
|
||||||
|
>= static_cast<socklen_t> (sizeof (struct sockaddr)));
|
||||||
|
|
||||||
if (ss_->sa_family != _address.generic.sa_family)
|
if (ss_->sa_family != _network_address.generic.sa_family)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
if (_address_mask > 0) {
|
if (_address_mask > 0) {
|
||||||
@@ -298,15 +293,15 @@ bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss_,
|
|||||||
their_bytes = reinterpret_cast<const uint8_t *> (
|
their_bytes = reinterpret_cast<const uint8_t *> (
|
||||||
&((reinterpret_cast<const struct sockaddr_in6 *> (ss_))
|
&((reinterpret_cast<const struct sockaddr_in6 *> (ss_))
|
||||||
->sin6_addr));
|
->sin6_addr));
|
||||||
our_bytes =
|
our_bytes = reinterpret_cast<const uint8_t *> (
|
||||||
reinterpret_cast<const uint8_t *> (&_address.ipv6.sin6_addr);
|
&_network_address.ipv6.sin6_addr);
|
||||||
mask = sizeof (struct in6_addr) * 8;
|
mask = sizeof (struct in6_addr) * 8;
|
||||||
} else {
|
} else {
|
||||||
zmq_assert (ss_len_ == sizeof (struct sockaddr_in));
|
zmq_assert (ss_len_ == sizeof (struct sockaddr_in));
|
||||||
their_bytes = reinterpret_cast<const uint8_t *> (&(
|
their_bytes = reinterpret_cast<const uint8_t *> (&(
|
||||||
(reinterpret_cast<const struct sockaddr_in *> (ss_))->sin_addr));
|
(reinterpret_cast<const struct sockaddr_in *> (ss_))->sin_addr));
|
||||||
our_bytes =
|
our_bytes = reinterpret_cast<const uint8_t *> (
|
||||||
reinterpret_cast<const uint8_t *> (&_address.ipv4.sin_addr);
|
&_network_address.ipv4.sin_addr);
|
||||||
mask = sizeof (struct in_addr) * 8;
|
mask = sizeof (struct in_addr) * 8;
|
||||||
}
|
}
|
||||||
if (_address_mask < mask)
|
if (_address_mask < mask)
|
||||||
|
|||||||
@@ -44,7 +44,6 @@ class tcp_address_t
|
|||||||
public:
|
public:
|
||||||
tcp_address_t ();
|
tcp_address_t ();
|
||||||
tcp_address_t (const sockaddr *sa_, socklen_t sa_len_);
|
tcp_address_t (const sockaddr *sa_, socklen_t sa_len_);
|
||||||
virtual ~tcp_address_t ();
|
|
||||||
|
|
||||||
// This function translates textual TCP address into an address
|
// This function translates textual TCP address into an address
|
||||||
// structure. If 'local' is true, names are resolved as local interface
|
// structure. If 'local' is true, names are resolved as local interface
|
||||||
@@ -53,7 +52,7 @@ class tcp_address_t
|
|||||||
int resolve (const char *name_, bool local_, bool ipv6_);
|
int resolve (const char *name_, bool local_, bool ipv6_);
|
||||||
|
|
||||||
// The opposite to resolve()
|
// The opposite to resolve()
|
||||||
virtual int to_string (std::string &addr_);
|
int to_string (std::string &addr_) const;
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_WINDOWS
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
unsigned short family () const;
|
unsigned short family () const;
|
||||||
@@ -67,13 +66,13 @@ class tcp_address_t
|
|||||||
socklen_t src_addrlen () const;
|
socklen_t src_addrlen () const;
|
||||||
bool has_src_addr () const;
|
bool has_src_addr () const;
|
||||||
|
|
||||||
protected:
|
private:
|
||||||
ip_addr_t _address;
|
ip_addr_t _address;
|
||||||
ip_addr_t _source_address;
|
ip_addr_t _source_address;
|
||||||
bool _has_src_addr;
|
bool _has_src_addr;
|
||||||
};
|
};
|
||||||
|
|
||||||
class tcp_address_mask_t : public tcp_address_t
|
class tcp_address_mask_t
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
tcp_address_mask_t ();
|
tcp_address_mask_t ();
|
||||||
@@ -84,7 +83,7 @@ class tcp_address_mask_t : public tcp_address_t
|
|||||||
int resolve (const char *name_, bool ipv6_);
|
int resolve (const char *name_, bool ipv6_);
|
||||||
|
|
||||||
// The opposite to resolve()
|
// The opposite to resolve()
|
||||||
int to_string (std::string &addr_);
|
int to_string (std::string &addr_) const;
|
||||||
|
|
||||||
int mask () const;
|
int mask () const;
|
||||||
|
|
||||||
@@ -92,6 +91,7 @@ class tcp_address_mask_t : public tcp_address_t
|
|||||||
const socklen_t ss_len_) const;
|
const socklen_t ss_len_) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
ip_addr_t _network_address;
|
||||||
int _address_mask;
|
int _address_mask;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -82,7 +82,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
|
|||||||
_socket (_session->get_socket ())
|
_socket (_session->get_socket ())
|
||||||
{
|
{
|
||||||
zmq_assert (_addr);
|
zmq_assert (_addr);
|
||||||
zmq_assert (_addr->protocol == "tcp");
|
zmq_assert (_addr->protocol == protocol_name::tcp);
|
||||||
_addr->to_string (_endpoint);
|
_addr->to_string (_endpoint);
|
||||||
// TODO the return value is unused! what if it fails? if this is impossible
|
// TODO the return value is unused! what if it fails? if this is impossible
|
||||||
// or does not matter, change such that endpoint in initialized using an
|
// or does not matter, change such that endpoint in initialized using an
|
||||||
|
|||||||
@@ -49,14 +49,14 @@ zmq::v1_encoder_t::~v1_encoder_t ()
|
|||||||
void zmq::v1_encoder_t::size_ready ()
|
void zmq::v1_encoder_t::size_ready ()
|
||||||
{
|
{
|
||||||
// Write message body into the buffer.
|
// Write message body into the buffer.
|
||||||
next_step (in_progress->data (), in_progress->size (),
|
next_step (in_progress ()->data (), in_progress ()->size (),
|
||||||
&v1_encoder_t::message_ready, true);
|
&v1_encoder_t::message_ready, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::v1_encoder_t::message_ready ()
|
void zmq::v1_encoder_t::message_ready ()
|
||||||
{
|
{
|
||||||
// Get the message size.
|
// Get the message size.
|
||||||
size_t size = in_progress->size ();
|
size_t size = in_progress ()->size ();
|
||||||
|
|
||||||
// Account for the 'flags' byte.
|
// Account for the 'flags' byte.
|
||||||
size++;
|
size++;
|
||||||
@@ -66,12 +66,12 @@ void zmq::v1_encoder_t::message_ready ()
|
|||||||
// message size. In both cases 'flags' field follows.
|
// message size. In both cases 'flags' field follows.
|
||||||
if (size < UCHAR_MAX) {
|
if (size < UCHAR_MAX) {
|
||||||
_tmpbuf[0] = static_cast<unsigned char> (size);
|
_tmpbuf[0] = static_cast<unsigned char> (size);
|
||||||
_tmpbuf[1] = (in_progress->flags () & msg_t::more);
|
_tmpbuf[1] = (in_progress ()->flags () & msg_t::more);
|
||||||
next_step (_tmpbuf, 2, &v1_encoder_t::size_ready, false);
|
next_step (_tmpbuf, 2, &v1_encoder_t::size_ready, false);
|
||||||
} else {
|
} else {
|
||||||
_tmpbuf[0] = UCHAR_MAX;
|
_tmpbuf[0] = UCHAR_MAX;
|
||||||
put_uint64 (_tmpbuf + 1, size);
|
put_uint64 (_tmpbuf + 1, size);
|
||||||
_tmpbuf[9] = (in_progress->flags () & msg_t::more);
|
_tmpbuf[9] = (in_progress ()->flags () & msg_t::more);
|
||||||
next_step (_tmpbuf, 10, &v1_encoder_t::size_ready, false);
|
next_step (_tmpbuf, 10, &v1_encoder_t::size_ready, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,17 +52,17 @@ void zmq::v2_encoder_t::message_ready ()
|
|||||||
// Encode flags.
|
// Encode flags.
|
||||||
unsigned char &protocol_flags = _tmp_buf[0];
|
unsigned char &protocol_flags = _tmp_buf[0];
|
||||||
protocol_flags = 0;
|
protocol_flags = 0;
|
||||||
if (in_progress->flags () & msg_t::more)
|
if (in_progress ()->flags () & msg_t::more)
|
||||||
protocol_flags |= v2_protocol_t::more_flag;
|
protocol_flags |= v2_protocol_t::more_flag;
|
||||||
if (in_progress->size () > UCHAR_MAX)
|
if (in_progress ()->size () > UCHAR_MAX)
|
||||||
protocol_flags |= v2_protocol_t::large_flag;
|
protocol_flags |= v2_protocol_t::large_flag;
|
||||||
if (in_progress->flags () & msg_t::command)
|
if (in_progress ()->flags () & msg_t::command)
|
||||||
protocol_flags |= v2_protocol_t::command_flag;
|
protocol_flags |= v2_protocol_t::command_flag;
|
||||||
|
|
||||||
// Encode the message length. For messages less then 256 bytes,
|
// Encode the message length. For messages less then 256 bytes,
|
||||||
// the length is encoded as 8-bit unsigned integer. For larger
|
// the length is encoded as 8-bit unsigned integer. For larger
|
||||||
// messages, 64-bit unsigned integer in network byte order is used.
|
// messages, 64-bit unsigned integer in network byte order is used.
|
||||||
const size_t size = in_progress->size ();
|
const size_t size = in_progress ()->size ();
|
||||||
if (unlikely (size > UCHAR_MAX)) {
|
if (unlikely (size > UCHAR_MAX)) {
|
||||||
put_uint64 (_tmp_buf + 1, size);
|
put_uint64 (_tmp_buf + 1, size);
|
||||||
next_step (_tmp_buf, 9, &v2_encoder_t::size_ready, false);
|
next_step (_tmp_buf, 9, &v2_encoder_t::size_ready, false);
|
||||||
@@ -75,6 +75,6 @@ void zmq::v2_encoder_t::message_ready ()
|
|||||||
void zmq::v2_encoder_t::size_ready ()
|
void zmq::v2_encoder_t::size_ready ()
|
||||||
{
|
{
|
||||||
// Write message body into the buffer.
|
// Write message body into the buffer.
|
||||||
next_step (in_progress->data (), in_progress->size (),
|
next_step (in_progress ()->data (), in_progress ()->size (),
|
||||||
&v2_encoder_t::message_ready, true);
|
&v2_encoder_t::message_ready, true);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,10 +87,10 @@ struct iovec
|
|||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "metadata.hpp"
|
#include "metadata.hpp"
|
||||||
#include "signaler.hpp"
|
|
||||||
#include "socket_poller.hpp"
|
#include "socket_poller.hpp"
|
||||||
#include "timers.hpp"
|
#include "timers.hpp"
|
||||||
#include "ip.hpp"
|
#include "ip.hpp"
|
||||||
|
#include "address.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OPENPGM
|
#if defined ZMQ_HAVE_OPENPGM
|
||||||
#define __PGM_WININT_H__
|
#define __PGM_WININT_H__
|
||||||
@@ -1489,7 +1489,7 @@ int zmq_device (int /* type */, void *frontend_, void *backend_)
|
|||||||
int zmq_has (const char *capability_)
|
int zmq_has (const char *capability_)
|
||||||
{
|
{
|
||||||
#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_OPENVMS)
|
#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_OPENVMS)
|
||||||
if (strcmp (capability_, "ipc") == 0)
|
if (strcmp (capability_, zmq::protocol_name::ipc) == 0)
|
||||||
return true;
|
return true;
|
||||||
#endif
|
#endif
|
||||||
#if defined(ZMQ_HAVE_OPENPGM)
|
#if defined(ZMQ_HAVE_OPENPGM)
|
||||||
@@ -1497,7 +1497,7 @@ int zmq_has (const char *capability_)
|
|||||||
return true;
|
return true;
|
||||||
#endif
|
#endif
|
||||||
#if defined(ZMQ_HAVE_TIPC)
|
#if defined(ZMQ_HAVE_TIPC)
|
||||||
if (strcmp (capability_, "tipc") == 0)
|
if (strcmp (capability_, zmq::protocol_name::tipc) == 0)
|
||||||
return true;
|
return true;
|
||||||
#endif
|
#endif
|
||||||
#if defined(ZMQ_HAVE_NORM)
|
#if defined(ZMQ_HAVE_NORM)
|
||||||
@@ -1513,7 +1513,7 @@ int zmq_has (const char *capability_)
|
|||||||
return true;
|
return true;
|
||||||
#endif
|
#endif
|
||||||
#if defined(ZMQ_HAVE_VMCI)
|
#if defined(ZMQ_HAVE_VMCI)
|
||||||
if (strcmp (capability_, "vmci") == 0)
|
if (strcmp (capability_, zmq::protocol_name::vmci) == 0)
|
||||||
return true;
|
return true;
|
||||||
#endif
|
#endif
|
||||||
#if defined(ZMQ_BUILD_DRAFT_API)
|
#if defined(ZMQ_BUILD_DRAFT_API)
|
||||||
|
|||||||
Reference in New Issue
Block a user