Merge pull request #226 from kurdybacha/clang-format

Problem: inconsistent code style
This commit is contained in:
Simon Giesecke 2018-05-11 23:21:45 +02:00 committed by GitHub
commit bc82352c95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 1580 additions and 1540 deletions

53
.clang-format Normal file
View File

@ -0,0 +1,53 @@
BasedOnStyle: LLVM
IndentWidth: 4
UseTab: Never
BreakBeforeBraces: Custom
BraceWrapping:
AfterClass: true
AfterControlStatement: false
AfterEnum: true
AfterFunction: true
AfterNamespace: true
AfterObjCDeclaration: true
AfterStruct: true
AfterUnion: true
BeforeCatch: true
BeforeElse: false
IndentBraces: false
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: false
AllowShortIfStatementsOnASingleLine: false
IndentCaseLabels: true
BinPackArguments: true
BinPackParameters: false
AlignTrailingComments: true
AllowShortBlocksOnASingleLine: false
AllowAllParametersOfDeclarationOnNextLine: true
AllowShortFunctionsOnASingleLine: InlineOnly
AlwaysBreakTemplateDeclarations: false
ColumnLimit: 80
MaxEmptyLinesToKeep: 2
KeepEmptyLinesAtTheStartOfBlocks: false
ContinuationIndentWidth: 2
PointerAlignment: Right
ReflowComments: false
SpaceBeforeAssignmentOperators: true
SpaceBeforeParens: Always
SpaceInEmptyParentheses: false
SpacesInAngles: false
SpacesInParentheses: false
SpacesInSquareBrackets: false
Standard: Cpp11
SortIncludes: false
FixNamespaceComments: false
BreakBeforeBinaryOperators: NonAssignment
SpaceAfterTemplateKeyword: true
AlignAfterOpenBracket: Align
AlignOperands: true
BreakConstructorInitializers: AfterColon
ConstructorInitializerAllOnOneLineOrOnePerLine: true
SpaceAfterCStyleCast: true
BreakBeforeTernaryOperators: true

View File

@ -3,6 +3,5 @@
int main (int argc, char **argv) int main (int argc, char **argv)
{ {
zmq::context_t context; zmq::context_t context;
return 0; return 0;
} }

View File

@ -13,8 +13,10 @@ TEST(active_poller, create_destroy)
ASSERT_TRUE (active_poller.empty ()); ASSERT_TRUE (active_poller.empty ());
} }
static_assert(!std::is_copy_constructible<zmq::active_poller_t>::value, "active_active_poller_t should not be copy-constructible"); static_assert (!std::is_copy_constructible<zmq::active_poller_t>::value,
static_assert(!std::is_copy_assignable<zmq::active_poller_t>::value, "active_active_poller_t should not be copy-assignable"); "active_active_poller_t should not be copy-constructible");
static_assert (!std::is_copy_assignable<zmq::active_poller_t>::value,
"active_active_poller_t should not be copy-assignable");
TEST (active_poller, move_construct_empty) TEST (active_poller, move_construct_empty)
{ {
@ -104,14 +106,16 @@ TEST(active_poller, add_handler_twice_throws)
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_t handler;
active_poller.add (socket, ZMQ_POLLIN, handler); active_poller.add (socket, ZMQ_POLLIN, handler);
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
ASSERT_THROW(active_poller.add(socket, ZMQ_POLLIN, handler), zmq::error_t); ASSERT_THROW (active_poller.add (socket, ZMQ_POLLIN, handler),
zmq::error_t);
} }
TEST (active_poller, wait_with_no_handlers_throws) TEST (active_poller, wait_with_no_handlers_throws)
{ {
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
ASSERT_THROW(active_poller.wait(std::chrono::milliseconds{10}), zmq::error_t); ASSERT_THROW (active_poller.wait (std::chrono::milliseconds{10}),
zmq::error_t);
} }
TEST (active_poller, remove_unregistered_throws) TEST (active_poller, remove_unregistered_throws)
@ -141,12 +145,11 @@ TEST(active_poller, remove_registered_non_empty)
ASSERT_NO_THROW (active_poller.remove (socket)); ASSERT_NO_THROW (active_poller.remove (socket));
} }
namespace { namespace
{
struct server_client_setup : common_server_client_setup struct server_client_setup : common_server_client_setup
{ {
zmq::active_poller_t::handler_t handler = [&](short e) { zmq::active_poller_t::handler_t handler = [&](short e) { events = e; };
events = e;
};
short events = 0; short events = 0;
}; };
@ -160,7 +163,8 @@ TEST(active_poller, poll_basic)
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
bool message_received = false; bool message_received = false;
zmq::active_poller_t::handler_t handler = [&message_received](short events) { zmq::active_poller_t::handler_t handler =
[&message_received](short events) {
ASSERT_TRUE (0 != (events & ZMQ_POLLIN)); ASSERT_TRUE (0 != (events & ZMQ_POLLIN));
message_received = true; message_received = true;
}; };
@ -184,8 +188,7 @@ TEST(active_poller, client_server)
if (0 != (e & ZMQ_POLLIN)) { if (0 != (e & ZMQ_POLLIN)) {
zmq::message_t zmq_msg; zmq::message_t zmq_msg;
ASSERT_NO_THROW (s.server.recv (&zmq_msg)); // get message ASSERT_NO_THROW (s.server.recv (&zmq_msg)); // get message
std::string recv_msg(zmq_msg.data<char>(), std::string recv_msg (zmq_msg.data<char> (), zmq_msg.size ());
zmq_msg.size());
ASSERT_EQ (send_msg, recv_msg); ASSERT_EQ (send_msg, recv_msg);
} else if (0 != (e & ~ZMQ_POLLOUT)) { } else if (0 != (e & ~ZMQ_POLLOUT)) {
ASSERT_TRUE (false) << "Unexpected event type " << events; ASSERT_TRUE (false) << "Unexpected event type " << events;
@ -203,7 +206,8 @@ TEST(active_poller, client_server)
// Re-add server socket with pollout flag // Re-add server socket with pollout flag
ASSERT_NO_THROW (active_poller.remove (s.server)); ASSERT_NO_THROW (active_poller.remove (s.server));
ASSERT_NO_THROW(active_poller.add(s.server, ZMQ_POLLIN | ZMQ_POLLOUT, handler)); ASSERT_NO_THROW (
active_poller.add (s.server, ZMQ_POLLIN | ZMQ_POLLOUT, handler));
ASSERT_EQ (1, active_poller.wait (std::chrono::milliseconds{-1})); ASSERT_EQ (1, active_poller.wait (std::chrono::milliseconds{-1}));
ASSERT_EQ (events, ZMQ_POLLOUT); ASSERT_EQ (events, ZMQ_POLLOUT);
} }
@ -214,7 +218,8 @@ TEST(active_poller, add_invalid_socket_throws)
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
zmq::socket_t a{context, zmq::socket_type::router}; zmq::socket_t a{context, zmq::socket_type::router};
zmq::socket_t b{std::move (a)}; zmq::socket_t b{std::move (a)};
ASSERT_THROW (active_poller.add (a, ZMQ_POLLIN, zmq::active_poller_t::handler_t {}), ASSERT_THROW (
active_poller.add (a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}),
zmq::error_t); zmq::error_t);
} }
@ -223,7 +228,8 @@ TEST(active_poller, remove_invalid_socket_throws)
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
ASSERT_NO_THROW (active_poller.add (socket, ZMQ_POLLIN, zmq::active_poller_t::handler_t {})); ASSERT_NO_THROW (active_poller.add (socket, ZMQ_POLLIN,
zmq::active_poller_t::handler_t{}));
ASSERT_EQ (1u, active_poller.size ()); ASSERT_EQ (1u, active_poller.size ());
std::vector<zmq::socket_t> sockets; std::vector<zmq::socket_t> sockets;
sockets.emplace_back (std::move (socket)); sockets.emplace_back (std::move (socket));
@ -264,7 +270,8 @@ TEST(active_poller, modify_not_added_throws)
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{context, zmq::socket_type::push}; zmq::socket_t b{context, zmq::socket_type::push};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
ASSERT_NO_THROW (active_poller.add (a, ZMQ_POLLIN, zmq::active_poller_t::handler_t {})); ASSERT_NO_THROW (
active_poller.add (a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}));
ASSERT_THROW (active_poller.modify (b, ZMQ_POLLIN), zmq::error_t); ASSERT_THROW (active_poller.modify (b, ZMQ_POLLIN), zmq::error_t);
} }
@ -273,7 +280,8 @@ TEST(active_poller, modify_simple)
zmq::context_t context; zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
ASSERT_NO_THROW (active_poller.add (a, ZMQ_POLLIN, zmq::active_poller_t::handler_t {})); ASSERT_NO_THROW (
active_poller.add (a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}));
ASSERT_NO_THROW (active_poller.modify (a, ZMQ_POLLIN | ZMQ_POLLOUT)); ASSERT_NO_THROW (active_poller.modify (a, ZMQ_POLLIN | ZMQ_POLLOUT));
} }
@ -308,9 +316,8 @@ TEST(active_poller, wait_one_return)
// Setup active_poller // Setup active_poller
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
ASSERT_NO_THROW(active_poller.add(s.server, ZMQ_POLLIN, [&count](short) { ASSERT_NO_THROW (
++count; active_poller.add (s.server, ZMQ_POLLIN, [&count](short) { ++count; }));
}));
// client sends message // client sends message
ASSERT_NO_THROW (s.client.send ("Hi")); ASSERT_NO_THROW (s.client.send ("Hi"));
@ -356,9 +363,8 @@ TEST(active_poller, received_on_move_constructed_active_poller)
int count = 0; int count = 0;
// Setup active_poller a // Setup active_poller a
zmq::active_poller_t a; zmq::active_poller_t a;
ASSERT_NO_THROW(a.add(s.server, ZMQ_POLLIN, [&count](short) { ASSERT_NO_THROW (
++count; a.add (s.server, ZMQ_POLLIN, [&count](short) { ++count; }));
}));
// client sends message // client sends message
ASSERT_NO_THROW (s.client.send ("Hi")); ASSERT_NO_THROW (s.client.send ("Hi"));
// wait for message and verify it is received // wait for message and verify it is received
@ -387,7 +393,8 @@ TEST(active_poller, remove_from_handler)
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
int count = 0; int count = 0;
for (auto i = 0; i < ITER_NO; ++i) { for (auto i = 0; i < ITER_NO; ++i) {
ASSERT_NO_THROW(active_poller.add(setup_list[i].server, ZMQ_POLLIN, [&,i](short events) { ASSERT_NO_THROW (active_poller.add (
setup_list[i].server, ZMQ_POLLIN, [&, i](short events) {
ASSERT_EQ (events, ZMQ_POLLIN); ASSERT_EQ (events, ZMQ_POLLIN);
active_poller.remove (setup_list[ITER_NO - i - 1].server); active_poller.remove (setup_list[ITER_NO - i - 1].server);
ASSERT_EQ (ITER_NO - i - 1, active_poller.size ()); ASSERT_EQ (ITER_NO - i - 1, active_poller.size ());

View File

@ -2,8 +2,10 @@
#include <zmq.hpp> #include <zmq.hpp>
#if defined(ZMQ_CPP11) #if defined(ZMQ_CPP11)
static_assert(!std::is_copy_constructible<zmq::message_t>::value, "message_t should not be copy-constructible"); static_assert (!std::is_copy_constructible<zmq::message_t>::value,
static_assert(!std::is_copy_assignable<zmq::message_t>::value, "message_t should not be copy-assignable"); "message_t should not be copy-constructible");
static_assert (!std::is_copy_assignable<zmq::message_t>::value,
"message_t should not be copy-assignable");
#endif #endif
TEST (message, constructor_default) TEST (message, constructor_default)
@ -30,7 +32,8 @@ TEST (message, constructor_pointer_size)
ASSERT_EQ (0, memcmp (data, hi_msg.data (), 2)); ASSERT_EQ (0, memcmp (data, hi_msg.data (), 2));
} }
TEST (message, constructor_char_array) { TEST (message, constructor_char_array)
{
const zmq::message_t hi_msg (data, strlen (data)); const zmq::message_t hi_msg (data, strlen (data));
ASSERT_EQ (2u, hi_msg.size ()); ASSERT_EQ (2u, hi_msg.size ());
ASSERT_EQ (0, memcmp (data, hi_msg.data (), 2)); ASSERT_EQ (0, memcmp (data, hi_msg.data (), 2));
@ -75,38 +78,43 @@ TEST (message, assign_move_empty_before_and_after)
} }
#endif #endif
TEST (message, equality_self) { TEST (message, equality_self)
{
const zmq::message_t hi_msg (data, strlen (data)); const zmq::message_t hi_msg (data, strlen (data));
ASSERT_EQ (hi_msg, hi_msg); ASSERT_EQ (hi_msg, hi_msg);
} }
TEST (message, equality_equal) { TEST (message, equality_equal)
{
const zmq::message_t hi_msg_a (data, strlen (data)); const zmq::message_t hi_msg_a (data, strlen (data));
const zmq::message_t hi_msg_b (data, strlen (data)); const zmq::message_t hi_msg_b (data, strlen (data));
ASSERT_EQ (hi_msg_a, hi_msg_b); ASSERT_EQ (hi_msg_a, hi_msg_b);
} }
TEST (message, equality_equal_empty) { TEST (message, equality_equal_empty)
{
const zmq::message_t msg_a; const zmq::message_t msg_a;
const zmq::message_t msg_b; const zmq::message_t msg_b;
ASSERT_EQ (msg_a, msg_b); ASSERT_EQ (msg_a, msg_b);
} }
TEST (message, equality_non_equal) { TEST (message, equality_non_equal)
{
const zmq::message_t msg_a ("Hi", 2); const zmq::message_t msg_a ("Hi", 2);
const zmq::message_t msg_b ("Hello", 5); const zmq::message_t msg_b ("Hello", 5);
ASSERT_NE (msg_a, msg_b); ASSERT_NE (msg_a, msg_b);
} }
TEST (message, equality_non_equal_rhs_empty) { TEST (message, equality_non_equal_rhs_empty)
{
const zmq::message_t msg_a ("Hi", 2); const zmq::message_t msg_a ("Hi", 2);
const zmq::message_t msg_b; const zmq::message_t msg_b;
ASSERT_NE (msg_a, msg_b); ASSERT_NE (msg_a, msg_b);
} }
TEST (message, equality_non_equal_lhs_empty) { TEST (message, equality_non_equal_lhs_empty)
{
const zmq::message_t msg_a; const zmq::message_t msg_a;
const zmq::message_t msg_b ("Hi", 2); const zmq::message_t msg_b ("Hi", 2);
ASSERT_NE (msg_a, msg_b); ASSERT_NE (msg_a, msg_b);
} }

View File

@ -8,8 +8,10 @@ TEST(multipart, legacy_test)
{ {
using namespace zmq; using namespace zmq;
bool ok = true; (void) ok; bool ok = true;
float num = 0; (void) num; (void) ok;
float num = 0;
(void) num;
std::string str = ""; std::string str = "";
message_t msg; message_t msg;

View File

@ -10,8 +10,10 @@ TEST(poller, create_destroy)
zmq::poller_t<> poller; zmq::poller_t<> poller;
} }
static_assert(!std::is_copy_constructible<zmq::poller_t<>>::value, "poller_t should not be copy-constructible"); static_assert (!std::is_copy_constructible<zmq::poller_t<>>::value,
static_assert(!std::is_copy_assignable<zmq::poller_t<>>::value, "poller_t should not be copy-assignable"); "poller_t should not be copy-constructible");
static_assert (!std::is_copy_assignable<zmq::poller_t<>>::value,
"poller_t should not be copy-assignable");
TEST (poller, move_construct_empty) TEST (poller, move_construct_empty)
{ {
@ -91,7 +93,8 @@ TEST(poller, wait_with_no_handlers_throws)
zmq::poller_t<> poller; zmq::poller_t<> poller;
std::vector<zmq_poller_event_t> events; std::vector<zmq_poller_event_t> events;
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
ASSERT_THROW(poller.wait_all(events, std::chrono::milliseconds{10}), zmq::error_t); ASSERT_THROW (poller.wait_all (events, std::chrono::milliseconds{10}),
zmq::error_t);
} }
TEST (poller, remove_unregistered_throws) TEST (poller, remove_unregistered_throws)
@ -142,8 +145,7 @@ TEST(poller, add_invalid_socket_throws)
zmq::poller_t<> poller; zmq::poller_t<> poller;
zmq::socket_t a{context, zmq::socket_type::router}; zmq::socket_t a{context, zmq::socket_type::router};
zmq::socket_t b{std::move (a)}; zmq::socket_t b{std::move (a)};
ASSERT_THROW (poller.add (a, ZMQ_POLLIN, nullptr), ASSERT_THROW (poller.add (a, ZMQ_POLLIN, nullptr), zmq::error_t);
zmq::error_t);
} }
TEST (poller, remove_invalid_socket_throws) TEST (poller, remove_invalid_socket_throws)
@ -243,7 +245,8 @@ TEST(poller, wait_on_move_constructed_poller)
zmq::poller_t<> b{std::move (a)}; zmq::poller_t<> b{std::move (a)};
std::vector<zmq_poller_event_t> events (1); std::vector<zmq_poller_event_t> events (1);
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
ASSERT_THROW(a.wait_all (events, std::chrono::milliseconds{10}), zmq::error_t); ASSERT_THROW (a.wait_all (events, std::chrono::milliseconds{10}),
zmq::error_t);
ASSERT_EQ (1, b.wait_all (events, std::chrono::milliseconds{-1})); ASSERT_EQ (1, b.wait_all (events, std::chrono::milliseconds{-1}));
} }
@ -257,7 +260,8 @@ TEST(poller, wait_on_move_assigned_poller)
b = {std::move (a)}; b = {std::move (a)};
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
std::vector<zmq_poller_event_t> events (1); std::vector<zmq_poller_event_t> events (1);
ASSERT_THROW(a.wait_all (events, std::chrono::milliseconds{10}), zmq::error_t); ASSERT_THROW (a.wait_all (events, std::chrono::milliseconds{10}),
zmq::error_t);
ASSERT_EQ (1, b.wait_all (events, std::chrono::milliseconds{-1})); ASSERT_EQ (1, b.wait_all (events, std::chrono::milliseconds{-1}));
} }
@ -273,7 +277,8 @@ TEST(poller, remove_from_handler)
// Setup poller // Setup poller
zmq::poller_t<> poller; zmq::poller_t<> poller;
for (auto i = 0; i < ITER_NO; ++i) { for (auto i = 0; i < ITER_NO; ++i) {
ASSERT_NO_THROW(poller.add(setup_list[i].server, ZMQ_POLLIN, nullptr)); ASSERT_NO_THROW (
poller.add (setup_list[i].server, ZMQ_POLLIN, nullptr));
} }
// Clients send messages // Clients send messages
for (auto &s : setup_list) { for (auto &s : setup_list) {
@ -288,7 +293,8 @@ TEST(poller, remove_from_handler)
// Fire all handlers in one wait // Fire all handlers in one wait
std::vector<zmq_poller_event_t> events (ITER_NO); std::vector<zmq_poller_event_t> events (ITER_NO);
ASSERT_EQ(ITER_NO, poller.wait_all (events, std::chrono::milliseconds{-1})); ASSERT_EQ (ITER_NO,
poller.wait_all (events, std::chrono::milliseconds{-1}));
} }
#endif #endif

353
zmq.hpp
View File

@ -67,7 +67,8 @@
#define CPPZMQ_VERSION_PATCH 0 #define CPPZMQ_VERSION_PATCH 0
#define CPPZMQ_VERSION \ #define CPPZMQ_VERSION \
ZMQ_MAKE_VERSION (CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR, CPPZMQ_VERSION_PATCH) ZMQ_MAKE_VERSION (CPPZMQ_VERSION_MAJOR, CPPZMQ_VERSION_MINOR, \
CPPZMQ_VERSION_PATCH)
#ifdef ZMQ_CPP11 #ifdef ZMQ_CPP11
#include <chrono> #include <chrono>
@ -78,9 +79,9 @@
#endif #endif
// Detect whether the compiler supports C++11 rvalue references. // Detect whether the compiler supports C++11 rvalue references.
#if (defined(__GNUC__) && (__GNUC__ > 4 || \ #if (defined(__GNUC__) \
(__GNUC__ == 4 && __GNUC_MINOR__ > 2)) && \ && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ > 2)) \
defined(__GXX_EXPERIMENTAL_CXX0X__)) && defined(__GXX_EXPERIMENTAL_CXX0X__))
#define ZMQ_HAS_RVALUE_REFS #define ZMQ_HAS_RVALUE_REFS
#define ZMQ_DELETED_FUNCTION = delete #define ZMQ_DELETED_FUNCTION = delete
#elif defined(__clang__) #elif defined(__clang__)
@ -110,7 +111,8 @@
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0)
#define ZMQ_HAS_PROXY_STEERABLE #define ZMQ_HAS_PROXY_STEERABLE
/* Socket event data */ /* Socket event data */
typedef struct { typedef struct
{
uint16_t event; // id of the event as bitfield uint16_t event; // id of the event as bitfield
int32_t value; // value is either error code, fd or reconnect interval int32_t value; // value is either error code, fd or reconnect interval
} zmq_event_t; } zmq_event_t;
@ -132,53 +134,47 @@ typedef struct {
namespace zmq namespace zmq
{ {
typedef zmq_free_fn free_fn; typedef zmq_free_fn free_fn;
typedef zmq_pollitem_t pollitem_t; typedef zmq_pollitem_t pollitem_t;
class error_t : public std::exception class error_t : public std::exception
{ {
public: public:
error_t () : errnum (zmq_errno ()) {} error_t () : errnum (zmq_errno ()) {}
#ifdef ZMQ_CPP11 #ifdef ZMQ_CPP11
virtual const char *what () const noexcept virtual const char *what () const noexcept { return zmq_strerror (errnum); }
{
return zmq_strerror (errnum);
}
#else #else
virtual const char *what() const throw () virtual const char *what () const throw () { return zmq_strerror (errnum); }
{
return zmq_strerror(errnum);
}
#endif #endif
int num () const int num () const { return errnum; }
{
return errnum;
}
private: private:
int errnum; int errnum;
}; };
inline int poll (zmq_pollitem_t const* items_, size_t nitems_, long timeout_ = -1) inline int
poll (zmq_pollitem_t const *items_, size_t nitems_, long timeout_ = -1)
{ {
int rc = zmq_poll (const_cast<zmq_pollitem_t*>(items_), static_cast<int>(nitems_), timeout_); int rc = zmq_poll (const_cast<zmq_pollitem_t *> (items_),
static_cast<int> (nitems_), timeout_);
if (rc < 0) if (rc < 0)
throw error_t (); throw error_t ();
return rc; return rc;
} }
#ifdef ZMQ_CPP11 #ifdef ZMQ_CPP11
inline int poll(zmq_pollitem_t const* items, size_t nitems, std::chrono::milliseconds timeout) inline int poll (zmq_pollitem_t const *items,
size_t nitems,
std::chrono::milliseconds timeout)
{ {
return poll (items, nitems, static_cast<long> (timeout.count ())); return poll (items, nitems, static_cast<long> (timeout.count ()));
} }
inline int poll(std::vector<zmq_pollitem_t> const& items, std::chrono::milliseconds timeout) inline int poll (std::vector<zmq_pollitem_t> const &items,
std::chrono::milliseconds timeout)
{ {
return poll(items.data(), items.size(), static_cast<long>(timeout.count())); return poll (items.data (), items.size (),
static_cast<long> (timeout.count ()));
} }
inline int poll (std::vector<zmq_pollitem_t> const &items, long timeout_ = -1) inline int poll (std::vector<zmq_pollitem_t> const &items, long timeout_ = -1)
@ -188,7 +184,6 @@ namespace zmq
#endif #endif
inline void proxy (void *frontend, void *backend, void *capture) inline void proxy (void *frontend, void *backend, void *capture)
{ {
int rc = zmq_proxy (frontend, backend, capture); int rc = zmq_proxy (frontend, backend, capture);
@ -197,7 +192,8 @@ namespace zmq
} }
#ifdef ZMQ_HAS_PROXY_STEERABLE #ifdef ZMQ_HAS_PROXY_STEERABLE
inline void proxy_steerable (void *frontend, void *backend, void *capture, void *control) inline void
proxy_steerable (void *frontend, void *backend, void *capture, void *control)
{ {
int rc = zmq_proxy_steerable (frontend, backend, capture, control); int rc = zmq_proxy_steerable (frontend, backend, capture, control);
if (rc != 0) if (rc != 0)
@ -224,7 +220,6 @@ namespace zmq
friend class socket_t; friend class socket_t;
public: public:
inline message_t () inline message_t ()
{ {
int rc = zmq_msg_init (&msg); int rc = zmq_msg_init (&msg);
@ -239,8 +234,7 @@ namespace zmq
throw error_t (); throw error_t ();
} }
template<typename I> message_t(I first, I last): template <typename I> message_t (I first, I last) : msg ()
msg()
{ {
typedef typename std::iterator_traits<I>::difference_type size_type; typedef typename std::iterator_traits<I>::difference_type size_type;
typedef typename std::iterator_traits<I>::value_type value_t; typedef typename std::iterator_traits<I>::value_type value_t;
@ -250,10 +244,10 @@ namespace zmq
if (rc != 0) if (rc != 0)
throw error_t (); throw error_t ();
value_t *dest = data<value_t> (); value_t *dest = data<value_t> ();
while (first != last) while (first != last) {
{
*dest = *first; *dest = *first;
++dest; ++first; ++dest;
++first;
} }
} }
@ -265,7 +259,9 @@ namespace zmq
memcpy (data (), data_, size_); memcpy (data (), data_, size_);
} }
inline message_t (void *data_, size_t size_, free_fn *ffn_, inline message_t (void *data_,
size_t size_,
free_fn *ffn_,
void *hint_ = NULL) void *hint_ = NULL)
{ {
int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_); int rc = zmq_msg_init_data (&msg, data_, size_, ffn_, hint_);
@ -274,9 +270,10 @@ namespace zmq
} }
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11)
template<typename T> message_t (const T &msg_) template <typename T>
: message_t (std::begin (msg_), std::end (msg_)) message_t (const T &msg_) : message_t (std::begin (msg_), std::end (msg_))
{} {
}
#endif #endif
#ifdef ZMQ_HAS_RVALUE_REFS #ifdef ZMQ_HAS_RVALUE_REFS
@ -331,8 +328,8 @@ namespace zmq
memcpy (data (), data_, size_); memcpy (data (), data_, size_);
} }
inline void rebuild (void *data_, size_t size_, free_fn *ffn_, inline void
void *hint_ = NULL) rebuild (void *data_, size_t size_, free_fn *ffn_, void *hint_ = NULL)
{ {
int rc = zmq_msg_close (&msg); int rc = zmq_msg_close (&msg);
if (rc != 0) if (rc != 0)
@ -362,10 +359,7 @@ namespace zmq
return rc != 0; return rc != 0;
} }
inline void *data () ZMQ_NOTHROW inline void *data () ZMQ_NOTHROW { return zmq_msg_data (&msg); }
{
return zmq_msg_data (&msg);
}
inline const void *data () const ZMQ_NOTHROW inline const void *data () const ZMQ_NOTHROW
{ {
@ -396,7 +390,8 @@ namespace zmq
inline bool operator== (const message_t &other) const ZMQ_NOTHROW inline bool operator== (const message_t &other) const ZMQ_NOTHROW
{ {
const size_t my_size = size (); const size_t my_size = size ();
return my_size == other.size () && 0 == memcmp (data (), other.data (), my_size); return my_size == other.size ()
&& 0 == memcmp (data (), other.data (), my_size);
} }
inline bool operator!= (const message_t &other) const ZMQ_NOTHROW inline bool operator!= (const message_t &other) const ZMQ_NOTHROW
@ -426,7 +421,8 @@ namespace zmq
size_t size = this->size (); size_t size = this->size ();
int is_ascii[2] = {0, 0}; int is_ascii[2] = {0, 0};
os << "zmq::message_t [size " << std::dec << std::setw(3) << std::setfill('0') << size << "] ("; os << "zmq::message_t [size " << std::dec << std::setw (3)
<< std::setfill ('0') << size << "] (";
// Totally arbitrary // Totally arbitrary
if (size >= 1000) { if (size >= 1000) {
os << "... too big to print)"; os << "... too big to print)";
@ -441,7 +437,8 @@ namespace zmq
if (is_ascii[1]) { if (is_ascii[1]) {
os << byte; os << byte;
} else { } else {
os << std::hex << std::uppercase << std::setw(2) << std::setfill('0') << static_cast<short>(byte); os << std::hex << std::uppercase << std::setw (2)
<< std::setfill ('0') << static_cast<short> (byte);
} }
is_ascii[0] = is_ascii[1]; is_ascii[0] = is_ascii[1];
} }
@ -473,7 +470,8 @@ namespace zmq
} }
inline explicit context_t (int io_threads_, int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT) inline explicit context_t (int io_threads_,
int max_sockets_ = ZMQ_MAX_SOCKETS_DFLT)
{ {
ptr = zmq_ctx_new (); ptr = zmq_ctx_new ();
if (ptr == NULL) if (ptr == NULL)
@ -505,15 +503,9 @@ namespace zmq
return rc; return rc;
} }
inline int getctxopt(int option_) inline int getctxopt (int option_) { return zmq_ctx_get (ptr, option_); }
{
return zmq_ctx_get (ptr, option_);
}
inline ~context_t () ZMQ_NOTHROW inline ~context_t () ZMQ_NOTHROW { close (); }
{
close();
}
inline void close () ZMQ_NOTHROW inline void close () ZMQ_NOTHROW
{ {
@ -528,22 +520,16 @@ namespace zmq
// Be careful with this, it's probably only useful for // Be careful with this, it's probably only useful for
// using the C api together with an existing C++ api. // using the C api together with an existing C++ api.
// Normally you should never need to use this. // Normally you should never need to use this.
inline ZMQ_EXPLICIT operator void* () ZMQ_NOTHROW inline ZMQ_EXPLICIT operator void * () ZMQ_NOTHROW { return ptr; }
{
return ptr;
}
inline ZMQ_EXPLICIT operator void const * () const ZMQ_NOTHROW inline ZMQ_EXPLICIT operator void const * () const ZMQ_NOTHROW
{ {
return ptr; return ptr;
} }
inline operator bool() const ZMQ_NOTHROW inline operator bool () const ZMQ_NOTHROW { return ptr != NULL; }
{
return ptr != NULL;
}
private:
private:
void *ptr; void *ptr;
context_t (const context_t &) ZMQ_DELETED_FUNCTION; context_t (const context_t &) ZMQ_DELETED_FUNCTION;
@ -577,11 +563,9 @@ namespace zmq
class socket_t class socket_t
{ {
friend class monitor_t; friend class monitor_t;
public: public:
inline socket_t(context_t& context_, int type_) inline socket_t (context_t &context_, int type_) { init (context_, type_); }
{
init(context_, type_);
}
#ifdef ZMQ_CPP11 #ifdef ZMQ_CPP11
inline socket_t (context_t &context_, socket_type type_) inline socket_t (context_t &context_, socket_type type_)
@ -591,8 +575,7 @@ namespace zmq
#endif #endif
#ifdef ZMQ_HAS_RVALUE_REFS #ifdef ZMQ_HAS_RVALUE_REFS
inline socket_t(socket_t&& rhs) ZMQ_NOTHROW : inline socket_t (socket_t &&rhs) ZMQ_NOTHROW : ptr (rhs.ptr),
ptr(rhs.ptr),
ctxptr (rhs.ctxptr) ctxptr (rhs.ctxptr)
{ {
rhs.ptr = NULL; rhs.ptr = NULL;
@ -605,20 +588,11 @@ namespace zmq
} }
#endif #endif
inline ~socket_t () ZMQ_NOTHROW inline ~socket_t () ZMQ_NOTHROW { close (); }
{
close();
}
inline operator void* () ZMQ_NOTHROW inline operator void * () ZMQ_NOTHROW { return ptr; }
{
return ptr;
}
inline operator void const* () const ZMQ_NOTHROW inline operator void const * () const ZMQ_NOTHROW { return ptr; }
{
return ptr;
}
inline void close () ZMQ_NOTHROW inline void close () ZMQ_NOTHROW
{ {
@ -635,16 +609,15 @@ namespace zmq
setsockopt (option_, &optval, sizeof (T)); setsockopt (option_, &optval, sizeof (T));
} }
inline void setsockopt (int option_, const void *optval_, inline void setsockopt (int option_, const void *optval_, size_t optvallen_)
size_t optvallen_)
{ {
int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
if (rc != 0) if (rc != 0)
throw error_t (); throw error_t ();
} }
inline void getsockopt (int option_, void *optval_, inline void
size_t *optvallen_) const getsockopt (int option_, void *optval_, size_t *optvallen_) const
{ {
int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
if (rc != 0) if (rc != 0)
@ -659,10 +632,7 @@ namespace zmq
return optval; return optval;
} }
inline void bind(std::string const& addr) inline void bind (std::string const &addr) { bind (addr.c_str ()); }
{
bind(addr.c_str());
}
inline void bind (const char *addr_) inline void bind (const char *addr_)
{ {
@ -671,10 +641,7 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inline void unbind(std::string const& addr) inline void unbind (std::string const &addr) { unbind (addr.c_str ()); }
{
unbind(addr.c_str());
}
inline void unbind (const char *addr_) inline void unbind (const char *addr_)
{ {
@ -683,10 +650,7 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inline void connect(std::string const& addr) inline void connect (std::string const &addr) { connect (addr.c_str ()); }
{
connect(addr.c_str());
}
inline void connect (const char *addr_) inline void connect (const char *addr_)
{ {
@ -707,10 +671,7 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inline bool connected() const ZMQ_NOTHROW inline bool connected () const ZMQ_NOTHROW { return (ptr != NULL); }
{
return(ptr != NULL);
}
inline size_t send (const void *buf_, size_t len_, int flags_ = 0) inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
{ {
@ -793,13 +754,12 @@ namespace zmq
if (monitor_socket) if (monitor_socket)
zmq_close (monitor_socket); zmq_close (monitor_socket);
} }
#ifdef ZMQ_HAS_RVALUE_REFS #ifdef ZMQ_HAS_RVALUE_REFS
monitor_t(monitor_t&& rhs) ZMQ_NOTHROW : monitor_t (monitor_t &&rhs) ZMQ_NOTHROW
socketPtr(rhs.socketPtr), : socketPtr (rhs.socketPtr),
monitor_socket (rhs.monitor_socket) monitor_socket (rhs.monitor_socket)
{ {
rhs.socketPtr = NULL; rhs.socketPtr = NULL;
@ -810,22 +770,24 @@ namespace zmq
#endif #endif
void monitor (socket_t &socket,
void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL) std::string const &addr,
int events = ZMQ_EVENT_ALL)
{ {
monitor (socket, addr.c_str (), events); monitor (socket, addr.c_str (), events);
} }
void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) void
monitor (socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{ {
init (socket, addr_, events); init (socket, addr_, events);
while(true) while (true) {
{
check_event (-1); check_event (-1);
} }
} }
void init(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL) void
init (socket_t &socket, std::string const &addr, int events = ZMQ_EVENT_ALL)
{ {
init (socket, addr.c_str (), events); init (socket, addr.c_str (), events);
} }
@ -859,16 +821,13 @@ namespace zmq
zmq::poll (&items[0], 1, timeout); zmq::poll (&items[0], 1, timeout);
if (items [0].revents & ZMQ_POLLIN) if (items[0].revents & ZMQ_POLLIN) {
{
int rc = zmq_msg_recv (&eventMsg, monitor_socket, 0); int rc = zmq_msg_recv (&eventMsg, monitor_socket, 0);
if (rc == -1 && zmq_errno () == ETERM) if (rc == -1 && zmq_errno () == ETERM)
return false; return false;
assert (rc != -1); assert (rc != -1);
} } else {
else
{
zmq_msg_close (&eventMsg); zmq_msg_close (&eventMsg);
return false; return false;
} }
@ -876,19 +835,20 @@ namespace zmq
#if ZMQ_VERSION_MAJOR >= 4 #if ZMQ_VERSION_MAJOR >= 4
const char *data = static_cast<const char *> (zmq_msg_data (&eventMsg)); const char *data = static_cast<const char *> (zmq_msg_data (&eventMsg));
zmq_event_t msgEvent; zmq_event_t msgEvent;
memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); memcpy (&msgEvent.event, data, sizeof (uint16_t));
data += sizeof (uint16_t);
memcpy (&msgEvent.value, data, sizeof (int32_t)); memcpy (&msgEvent.value, data, sizeof (int32_t));
zmq_event_t *event = &msgEvent; zmq_event_t *event = &msgEvent;
#else #else
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg)); zmq_event_t *event =
static_cast<zmq_event_t *> (zmq_msg_data (&eventMsg));
#endif #endif
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
zmq_msg_t addrMsg; zmq_msg_t addrMsg;
zmq_msg_init (&addrMsg); zmq_msg_init (&addrMsg);
int rc = zmq_msg_recv (&addrMsg, monitor_socket, 0); int rc = zmq_msg_recv (&addrMsg, monitor_socket, 0);
if (rc == -1 && zmq_errno() == ETERM) if (rc == -1 && zmq_errno () == ETERM) {
{
zmq_msg_close (&eventMsg); zmq_msg_close (&eventMsg);
return false; return false;
} }
@ -903,8 +863,7 @@ namespace zmq
#endif #endif
#ifdef ZMQ_EVENT_MONITOR_STOPPED #ifdef ZMQ_EVENT_MONITOR_STOPPED
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
{
zmq_msg_close (&eventMsg); zmq_msg_close (&eventMsg);
return true; return true;
} }
@ -988,28 +947,111 @@ namespace zmq
} }
#endif #endif
virtual void on_monitor_started () {} virtual void on_monitor_started () {}
virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } virtual void on_event_connected (const zmq_event_t &event_,
virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } const char *addr_)
virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } {
virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } (void) event_;
virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } (void) addr_;
virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } }
virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } virtual void on_event_connect_delayed (const zmq_event_t &event_,
virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } const char *addr_)
virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } {
virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } (void) event_;
(void) addr_;
}
virtual void on_event_connect_retried (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_listening (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_bind_failed (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_accepted (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_accept_failed (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_closed (const zmq_event_t &event_, const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_close_failed (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_disconnected (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3) #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)
virtual void on_event_handshake_failed_no_detail(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } virtual void on_event_handshake_failed_no_detail (const zmq_event_t &event_,
virtual void on_event_handshake_failed_protocol(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } const char *addr_)
virtual void on_event_handshake_failed_auth(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } {
virtual void on_event_handshake_succeeded(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } (void) event_;
(void) addr_;
}
virtual void on_event_handshake_failed_protocol (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_handshake_failed_auth (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_handshake_succeeded (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
#elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) #elif ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
virtual void on_event_handshake_failed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } virtual void on_event_handshake_failed (const zmq_event_t &event_,
virtual void on_event_handshake_succeed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } const char *addr_)
{
(void) event_;
(void) addr_;
}
virtual void on_event_handshake_succeed (const zmq_event_t &event_,
const char *addr_)
{
(void) event_;
(void) addr_;
}
#endif #endif
virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } virtual void on_event_unknown (const zmq_event_t &event_, const char *addr_)
private: {
(void) event_;
(void) addr_;
}
private:
monitor_t (const monitor_t &) ZMQ_DELETED_FUNCTION; monitor_t (const monitor_t &) ZMQ_DELETED_FUNCTION;
void operator= (const monitor_t &) ZMQ_DELETED_FUNCTION; void operator= (const monitor_t &) ZMQ_DELETED_FUNCTION;
@ -1017,36 +1059,40 @@ namespace zmq
void *monitor_socket; void *monitor_socket;
}; };
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
template <typename T = void> && defined(ZMQ_HAVE_POLLER)
class poller_t template <typename T = void> class poller_t
{ {
public: public:
void add (zmq::socket_t &socket, short events, T *user_data) void add (zmq::socket_t &socket, short events, T *user_data)
{ {
if (0 != zmq_poller_add (poller_ptr.get (), static_cast<void*>(socket), user_data, events)) if (0
{ != zmq_poller_add (poller_ptr.get (), static_cast<void *> (socket),
user_data, events)) {
throw error_t (); throw error_t ();
} }
} }
void remove (zmq::socket_t &socket) void remove (zmq::socket_t &socket)
{ {
if (0 != zmq_poller_remove (poller_ptr.get (), static_cast<void*>(socket))) if (0
{ != zmq_poller_remove (poller_ptr.get (),
static_cast<void *> (socket))) {
throw error_t (); throw error_t ();
} }
} }
void modify (zmq::socket_t &socket, short events) void modify (zmq::socket_t &socket, short events)
{ {
if (0 != zmq_poller_modify (poller_ptr.get (), static_cast<void*>(socket), events)) if (0
{ != zmq_poller_modify (poller_ptr.get (),
static_cast<void *> (socket), events)) {
throw error_t (); throw error_t ();
} }
} }
int wait_all (std::vector<zmq_poller_event_t> &poller_events, const std::chrono::microseconds timeout) int wait_all (std::vector<zmq_poller_event_t> &poller_events,
const std::chrono::microseconds timeout)
{ {
int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (), int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (),
static_cast<int> (poller_events.size ()), static_cast<int> (poller_events.size ()),
@ -1063,9 +1109,9 @@ namespace zmq
throw error_t (); throw error_t ();
} }
private: private:
std::unique_ptr<void, std::function<void(void*)>> poller_ptr std::unique_ptr<void, std::function<void(void *)>> poller_ptr{
{
[]() { []() {
auto poller_new = zmq_poller_new (); auto poller_new = zmq_poller_new ();
if (poller_new) if (poller_new)
@ -1075,8 +1121,7 @@ namespace zmq
[](void *ptr) { [](void *ptr) {
int rc = zmq_poller_destroy (&ptr); int rc = zmq_poller_destroy (&ptr);
ZMQ_ASSERT (rc == 0); ZMQ_ASSERT (rc == 0);
} }};
};
}; };
#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)

View File

@ -31,8 +31,8 @@
#include <sstream> #include <sstream>
#include <stdexcept> #include <stdexcept>
namespace zmq { namespace zmq
{
#ifdef ZMQ_HAS_RVALUE_REFS #ifdef ZMQ_HAS_RVALUE_REFS
/* /*
@ -48,46 +48,30 @@ private:
std::deque<message_t> m_parts; std::deque<message_t> m_parts;
public: public:
typedef std::deque<message_t>::iterator iterator; typedef std::deque<message_t>::iterator iterator;
typedef std::deque<message_t>::const_iterator const_iterator; typedef std::deque<message_t>::const_iterator const_iterator;
typedef std::deque<message_t>::reverse_iterator reverse_iterator; typedef std::deque<message_t>::reverse_iterator reverse_iterator;
typedef std::deque<message_t>::const_reverse_iterator const_reverse_iterator; typedef std::deque<message_t>::const_reverse_iterator
const_reverse_iterator;
// Default constructor // Default constructor
multipart_t() multipart_t () {}
{}
// Construct from socket receive // Construct from socket receive
multipart_t(socket_t& socket) multipart_t (socket_t &socket) { recv (socket); }
{
recv(socket);
}
// Construct from memory block // Construct from memory block
multipart_t(const void *src, size_t size) multipart_t (const void *src, size_t size) { addmem (src, size); }
{
addmem(src, size);
}
// Construct from string // Construct from string
multipart_t(const std::string& string) multipart_t (const std::string &string) { addstr (string); }
{
addstr(string);
}
// Construct from message part // Construct from message part
multipart_t(message_t&& message) multipart_t (message_t &&message) { add (std::move (message)); }
{
add(std::move(message));
}
// Move constructor // Move constructor
multipart_t(multipart_t&& other) multipart_t (multipart_t &&other) { m_parts = std::move (other.m_parts); }
{
m_parts = std::move(other.m_parts);
}
// Move assignment operator // Move assignment operator
multipart_t &operator= (multipart_t &&other) multipart_t &operator= (multipart_t &&other)
@ -97,106 +81,51 @@ public:
} }
// Destructor // Destructor
virtual ~multipart_t() virtual ~multipart_t () { clear (); }
{
clear();
}
message_t& operator[] (size_t n) message_t &operator[] (size_t n) { return m_parts[n]; }
{
return m_parts[n];
}
const message_t& operator[] (size_t n) const const message_t &operator[] (size_t n) const { return m_parts[n]; }
{
return m_parts[n];
}
message_t& at (size_t n) message_t &at (size_t n) { return m_parts.at (n); }
{
return m_parts.at(n);
}
const message_t& at (size_t n) const const message_t &at (size_t n) const { return m_parts.at (n); }
{
return m_parts.at(n);
}
iterator begin() iterator begin () { return m_parts.begin (); }
{
return m_parts.begin();
}
const_iterator begin() const const_iterator begin () const { return m_parts.begin (); }
{
return m_parts.begin();
}
const_iterator cbegin() const const_iterator cbegin () const { return m_parts.cbegin (); }
{
return m_parts.cbegin();
}
reverse_iterator rbegin() reverse_iterator rbegin () { return m_parts.rbegin (); }
{
return m_parts.rbegin();
}
const_reverse_iterator rbegin() const const_reverse_iterator rbegin () const { return m_parts.rbegin (); }
{
return m_parts.rbegin();
}
iterator end() iterator end () { return m_parts.end (); }
{
return m_parts.end();
}
const_iterator end() const const_iterator end () const { return m_parts.end (); }
{
return m_parts.end();
}
const_iterator cend() const const_iterator cend () const { return m_parts.cend (); }
{
return m_parts.cend();
}
reverse_iterator rend() reverse_iterator rend () { return m_parts.rend (); }
{
return m_parts.rend();
}
const_reverse_iterator rend() const const_reverse_iterator rend () const { return m_parts.rend (); }
{
return m_parts.rend();
}
// Delete all parts // Delete all parts
void clear() void clear () { m_parts.clear (); }
{
m_parts.clear();
}
// Get number of parts // Get number of parts
size_t size() const size_t size () const { return m_parts.size (); }
{
return m_parts.size();
}
// Check if number of parts is zero // Check if number of parts is zero
bool empty() const bool empty () const { return m_parts.empty (); }
{
return m_parts.empty();
}
// Receive multipart message from socket // Receive multipart message from socket
bool recv (socket_t &socket, int flags = 0) bool recv (socket_t &socket, int flags = 0)
{ {
clear (); clear ();
bool more = true; bool more = true;
while (more) while (more) {
{
message_t message; message_t message;
if (!socket.recv (&message, flags)) if (!socket.recv (&message, flags))
return false; return false;
@ -211,8 +140,7 @@ public:
{ {
flags &= ~(ZMQ_SNDMORE); flags &= ~(ZMQ_SNDMORE);
bool more = size () > 0; bool more = size () > 0;
while (more) while (more) {
{
message_t message = pop (); message_t message = pop ();
more = size () > 0; more = size () > 0;
if (!socket.send (message, (more ? ZMQ_SNDMORE : 0) | flags)) if (!socket.send (message, (more ? ZMQ_SNDMORE : 0) | flags))
@ -261,18 +189,18 @@ public:
} }
// Push type (fixed-size) to front // Push type (fixed-size) to front
template<typename T> template <typename T> void pushtyp (const T &type)
void pushtyp(const T& type)
{ {
static_assert(!std::is_same<T, std::string>::value, "Use pushstr() instead of pushtyp<std::string>()"); static_assert (!std::is_same<T, std::string>::value,
"Use pushstr() instead of pushtyp<std::string>()");
m_parts.push_front (message_t (&type, sizeof (type))); m_parts.push_front (message_t (&type, sizeof (type)));
} }
// Push type (fixed-size) to back // Push type (fixed-size) to back
template<typename T> template <typename T> void addtyp (const T &type)
void addtyp(const T& type)
{ {
static_assert(!std::is_same<T, std::string>::value, "Use addstr() instead of addtyp<std::string>()"); static_assert (!std::is_same<T, std::string>::value,
"Use addstr() instead of addtyp<std::string>()");
m_parts.push_back (message_t (&type, sizeof (type))); m_parts.push_back (message_t (&type, sizeof (type)));
} }
@ -283,26 +211,25 @@ public:
} }
// Push message part to back // Push message part to back
void add(message_t&& message) void add (message_t &&message) { m_parts.push_back (std::move (message)); }
{
m_parts.push_back(std::move(message));
}
// Pop string from front // Pop string from front
std::string popstr () std::string popstr ()
{ {
std::string string(m_parts.front().data<char>(), m_parts.front().size()); std::string string (m_parts.front ().data<char> (),
m_parts.front ().size ());
m_parts.pop_front (); m_parts.pop_front ();
return string; return string;
} }
// Pop type (fixed-size) from front // Pop type (fixed-size) from front
template<typename T> template <typename T> T poptyp ()
T poptyp()
{ {
static_assert(!std::is_same<T, std::string>::value, "Use popstr() instead of poptyp<std::string>()"); static_assert (!std::is_same<T, std::string>::value,
"Use popstr() instead of poptyp<std::string>()");
if (sizeof (T) != m_parts.front ().size ()) if (sizeof (T) != m_parts.front ().size ())
throw std::runtime_error("Invalid type, size does not match the message size"); throw std::runtime_error (
"Invalid type, size does not match the message size");
T type = *m_parts.front ().data<T> (); T type = *m_parts.front ().data<T> ();
m_parts.pop_front (); m_parts.pop_front ();
return type; return type;
@ -325,32 +252,30 @@ public:
} }
// Get pointer to a specific message part // Get pointer to a specific message part
const message_t* peek(size_t index) const const message_t *peek (size_t index) const { return &m_parts[index]; }
{
return &m_parts[index];
}
// Get a string copy of a specific message part // Get a string copy of a specific message part
std::string peekstr (size_t index) const std::string peekstr (size_t index) const
{ {
std::string string(m_parts[index].data<char>(), m_parts[index].size()); std::string string (m_parts[index].data<char> (),
m_parts[index].size ());
return string; return string;
} }
// Peek type (fixed-size) from front // Peek type (fixed-size) from front
template<typename T> template <typename T> T peektyp (size_t index) const
T peektyp(size_t index) const
{ {
static_assert(!std::is_same<T, std::string>::value, "Use peekstr() instead of peektyp<std::string>()"); static_assert (!std::is_same<T, std::string>::value,
"Use peekstr() instead of peektyp<std::string>()");
if (sizeof (T) != m_parts[index].size ()) if (sizeof (T) != m_parts[index].size ())
throw std::runtime_error("Invalid type, size does not match the message size"); throw std::runtime_error (
"Invalid type, size does not match the message size");
T type = *m_parts[index].data<T> (); T type = *m_parts[index].data<T> ();
return type; return type;
} }
// Create multipart from type (fixed-size) // Create multipart from type (fixed-size)
template<typename T> template <typename T> static multipart_t create (const T &type)
static multipart_t create(const T& type)
{ {
multipart_t multipart; multipart_t multipart;
multipart.addtyp (type); multipart.addtyp (type);
@ -370,33 +295,30 @@ public:
std::string str () const std::string str () const
{ {
std::stringstream ss; std::stringstream ss;
for (size_t i = 0; i < m_parts.size(); i++) for (size_t i = 0; i < m_parts.size (); i++) {
{
const unsigned char *data = m_parts[i].data<unsigned char> (); const unsigned char *data = m_parts[i].data<unsigned char> ();
size_t size = m_parts[i].size (); size_t size = m_parts[i].size ();
// Dump the message as text or binary // Dump the message as text or binary
bool isText = true; bool isText = true;
for (size_t j = 0; j < size; j++) for (size_t j = 0; j < size; j++) {
{ if (data[j] < 32 || data[j] > 127) {
if (data[j] < 32 || data[j] > 127)
{
isText = false; isText = false;
break; break;
} }
} }
ss << "\n[" << std::dec << std::setw(3) << std::setfill('0') << size << "] "; ss << "\n[" << std::dec << std::setw (3) << std::setfill ('0')
if (size >= 1000) << size << "] ";
{ if (size >= 1000) {
ss << "... (to big to print)"; ss << "... (to big to print)";
continue; continue;
} }
for (size_t j = 0; j < size; j++) for (size_t j = 0; j < size; j++) {
{
if (isText) if (isText)
ss << static_cast<char> (data[j]); ss << static_cast<char> (data[j]);
else else
ss << std::hex << std::setw(2) << std::setfill('0') << static_cast<short>(data[j]); ss << std::hex << std::setw (2) << std::setfill ('0')
<< static_cast<short> (data[j]);
} }
} }
return ss.str (); return ss.str ();
@ -426,7 +348,8 @@ inline std::ostream& operator<<(std::ostream& os, const multipart_t& msg)
#endif // ZMQ_HAS_RVALUE_REFS #endif // ZMQ_HAS_RVALUE_REFS
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
&& defined(ZMQ_HAVE_POLLER)
class active_poller_t class active_poller_t
{ {
public: public:
@ -445,17 +368,18 @@ inline std::ostream& operator<<(std::ostream& os, const multipart_t& msg)
{ {
auto it = decltype (handlers)::iterator{}; auto it = decltype (handlers)::iterator{};
auto inserted = bool{}; auto inserted = bool{};
std::tie(it, inserted) = handlers.emplace (static_cast<void*>(socket), std::make_shared<handler_t> (std::move (handler))); std::tie (it, inserted) =
try handlers.emplace (static_cast<void *> (socket),
{ std::make_shared<handler_t> (std::move (handler)));
base_poller.add (socket, events, inserted && *(it->second) ? it->second.get() : nullptr); try {
base_poller.add (socket, events,
inserted && *(it->second) ? it->second.get ()
: nullptr);
need_rebuild |= inserted; need_rebuild |= inserted;
} }
catch (const zmq::error_t&) catch (const zmq::error_t &) {
{
// rollback // rollback
if (inserted) if (inserted) {
{
handlers.erase (static_cast<void *> (socket)); handlers.erase (static_cast<void *> (socket));
} }
throw; throw;
@ -487,24 +411,20 @@ inline std::ostream& operator<<(std::ostream& os, const multipart_t& msg)
} }
const int count = base_poller.wait_all (poller_events, timeout); const int count = base_poller.wait_all (poller_events, timeout);
if (count != 0) { if (count != 0) {
std::for_each (poller_events.begin (), poller_events.begin () + count, std::for_each (poller_events.begin (),
poller_events.begin () + count,
[](zmq_poller_event_t &event) { [](zmq_poller_event_t &event) {
if (event.user_data != NULL) if (event.user_data != NULL)
(*reinterpret_cast<handler_t*> (event.user_data)) (event.events); (*reinterpret_cast<handler_t *> (
event.user_data)) (event.events);
}); });
} }
return count; return count;
} }
bool empty () const bool empty () const { return handlers.empty (); }
{
return handlers.empty ();
}
size_t size () const size_t size () const { return handlers.size (); }
{
return handlers.size ();
}
private: private:
bool need_rebuild{false}; bool need_rebuild{false};