diff --git a/src/encoder.hpp b/src/encoder.hpp index 3ebb095e..56b41246 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -60,7 +60,7 @@ namespace zmq { free (buf); } - + // The function returns a batch of binary data. The data // are filled to a supplied buffer. If no buffer is supplied (data_ // points to NULL) decoder object will provide buffer of its own. diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index f1c99695..4bb54766 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -15,7 +15,7 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), is_sender(false), is_receiver(false), zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false), - norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false), + norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false), tx_index(0), tx_len(0), zmq_input_ready(false) { @@ -267,12 +267,14 @@ void zmq::norm_engine_t::send_data() // Buffer contained end of message (should we flush?) //NormStreamMarkEom(norm_tx_stream); // Note this makes NORM fairly chatty for low duty cycle messaging + // but makes sure content is delivered quickly. Positive acknowledgements + // with flush override would make NORM more succinct here NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE); } tx_index = tx_len = 0; // all buffered data was written } // Still norm_tx_ready, so ask for more data from zmq_session - if (!zmq_encoder.has_data()) + if (!zmq_encoder.has_data()) { // Existing message had no more data to encode if (-1 == zmq_session->pull_msg(&tx_msg)) @@ -333,20 +335,19 @@ void zmq::norm_engine_t::in_event() break; case NORM_RX_OBJECT_ABORTED: + { + NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object); + if (NULL != rxState) { - NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object); - if (NULL != rxState) - { - // Remove the state from the list it's in - // This is now unnecessary since deletion takes care of list removal - // but in the interest of being clear ... - NormRxStreamState::List* list = rxState->AccessList(); - if (NULL != list) list->Remove(*rxState); - } - delete rxState; + // Remove the state from the list it's in + // This is now unnecessary since deletion takes care of list removal + // but in the interest of being clear ... + NormRxStreamState::List* list = rxState->AccessList(); + if (NULL != list) list->Remove(*rxState); } + delete rxState; break; - + } case NORM_REMOTE_SENDER_INACTIVE: // Here we free resources used for this formerly active sender. // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index 29fbe7fd..4432ce57 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -2,8 +2,6 @@ #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__ #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__ -#define ZMQ_HAVE_NORM 1 - #if defined ZMQ_HAVE_NORM #include "io_object.hpp" @@ -12,7 +10,7 @@ #include "v2_decoder.hpp" #include "v2_encoder.hpp" -#include +#include namespace zmq {