Updated libzmq to match RFC 23, 24, 25, 26

* Command names changed from null terminated to length-specified
* Command frames use the correct flag (bit 2)
* test_stream acts as test case for command frames
* Some code cleanups
This commit is contained in:
Pieter Hintjens
2013-09-04 17:59:45 +02:00
parent 1844a27c82
commit 28b0a5fa27
18 changed files with 114 additions and 107 deletions

View File

@@ -395,7 +395,6 @@ bool zmq::stream_engine_t::handshake ()
{
zmq_assert (handshaking);
zmq_assert (greeting_bytes_read < greeting_size);
// Receive the greeting.
while (greeting_bytes_read < greeting_size) {
const int n = read (greeting_recv + greeting_bytes_read,
@@ -492,8 +491,8 @@ bool zmq::stream_engine_t::handshake ()
insize = greeting_bytes_read;
// To allow for interoperability with peers that do not forward
// their subscriptions, we inject a phony subscription
// message into the incomming message stream.
// their subscriptions, we inject a phantom subscription message
// message into the incoming message stream.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
subscription_required = true;
}
@@ -550,9 +549,8 @@ bool zmq::stream_engine_t::handshake ()
error ();
return false;
}
read_msg = &stream_engine_t::next_handshake_message;
write_msg = &stream_engine_t::process_handshake_message;
read_msg = &stream_engine_t::next_handshake_command;
write_msg = &stream_engine_t::process_handshake_command;
}
// Start polling for output if necessary.
@@ -598,12 +596,13 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
return 0;
}
int zmq::stream_engine_t::next_handshake_message (msg_t *msg_)
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
{
zmq_assert (mechanism != NULL);
const int rc = mechanism->next_handshake_message (msg_);
const int rc = mechanism->next_handshake_command (msg_);
if (rc == 0) {
msg_->set_flags (msg_t::command);
if (mechanism->is_handshake_complete ())
mechanism_ready ();
}
@@ -611,11 +610,10 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_)
return rc;
}
int zmq::stream_engine_t::process_handshake_message (msg_t *msg_)
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
{
zmq_assert (mechanism != NULL);
const int rc = mechanism->process_handshake_message (msg_);
const int rc = mechanism->process_handshake_command (msg_);
if (rc == 0) {
if (mechanism->is_handshake_complete ())
mechanism_ready ();