mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-26 18:42:43 +01:00
Problem: source conatins trailing spaces
Solution: remove them
This commit is contained in:
@@ -119,7 +119,7 @@ namespace zmq
|
|||||||
bytes_used_ = size_;
|
bytes_used_ = size_;
|
||||||
|
|
||||||
while (!to_read) {
|
while (!to_read) {
|
||||||
const int rc =
|
const int rc =
|
||||||
(static_cast <T *> (this)->*next) (data_ + bytes_used_);
|
(static_cast <T *> (this)->*next) (data_ + bytes_used_);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
return rc;
|
return rc;
|
||||||
|
|||||||
@@ -9,14 +9,14 @@
|
|||||||
|
|
||||||
zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
|
zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
|
||||||
const options_t& options_)
|
const options_t& options_)
|
||||||
: io_object_t(parent_), zmq_session(NULL), options(options_),
|
: io_object_t(parent_), zmq_session(NULL), options(options_),
|
||||||
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
|
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
|
||||||
is_sender(false), is_receiver(false),
|
is_sender(false), is_receiver(false),
|
||||||
zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
|
zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
|
||||||
tx_first_msg(true), tx_more_bit(false),
|
tx_first_msg(true), tx_more_bit(false),
|
||||||
zmq_output_ready(false), norm_tx_ready(false),
|
zmq_output_ready(false), norm_tx_ready(false),
|
||||||
tx_index(0), tx_len(0),
|
tx_index(0), tx_len(0),
|
||||||
zmq_input_ready(false)
|
zmq_input_ready(false)
|
||||||
{
|
{
|
||||||
int rc = tx_msg.init();
|
int rc = tx_msg.init();
|
||||||
errno_assert(0 == rc);
|
errno_assert(0 == rc);
|
||||||
@@ -50,7 +50,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
{
|
{
|
||||||
ifacePtr = network_;
|
ifacePtr = network_;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Second, look for optional multicast ifaceName
|
// Second, look for optional multicast ifaceName
|
||||||
char ifaceName[256];
|
char ifaceName[256];
|
||||||
const char* addrPtr = strchr(ifacePtr, ';');
|
const char* addrPtr = strchr(ifacePtr, ';');
|
||||||
@@ -68,7 +68,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
addrPtr = ifacePtr;
|
addrPtr = ifacePtr;
|
||||||
ifacePtr = NULL;
|
ifacePtr = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, parse IP address and port number
|
// Finally, parse IP address and port number
|
||||||
const char* portPtr = strrchr(addrPtr, ':');
|
const char* portPtr = strrchr(addrPtr, ':');
|
||||||
if (NULL == portPtr)
|
if (NULL == portPtr)
|
||||||
@@ -76,7 +76,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char addr[256];
|
char addr[256];
|
||||||
size_t addrLen = portPtr - addrPtr;
|
size_t addrLen = portPtr - addrPtr;
|
||||||
if (addrLen > 255) addrLen = 255;
|
if (addrLen > 255) addrLen = 255;
|
||||||
@@ -84,7 +84,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
addr[addrLen] = '\0';
|
addr[addrLen] = '\0';
|
||||||
portPtr++;
|
portPtr++;
|
||||||
unsigned short portNumber = atoi(portPtr);
|
unsigned short portNumber = atoi(portPtr);
|
||||||
|
|
||||||
if (NORM_INSTANCE_INVALID == norm_instance)
|
if (NORM_INSTANCE_INVALID == norm_instance)
|
||||||
{
|
{
|
||||||
if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance()))
|
if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance()))
|
||||||
@@ -93,14 +93,14 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TBD - What do we use for our local NormNodeId?
|
// TBD - What do we use for our local NormNodeId?
|
||||||
// (for now we use automatic, IP addr based assignment or passed in 'id')
|
// (for now we use automatic, IP addr based assignment or passed in 'id')
|
||||||
// a) Use ZMQ Identity somehow?
|
// a) Use ZMQ Identity somehow?
|
||||||
// b) Add function to use iface addr
|
// b) Add function to use iface addr
|
||||||
// c) Randomize and implement a NORM session layer
|
// c) Randomize and implement a NORM session layer
|
||||||
// conflict detection/resolution protocol
|
// conflict detection/resolution protocol
|
||||||
|
|
||||||
norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
|
norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
|
||||||
if (NORM_SESSION_INVALID == norm_session)
|
if (NORM_SESSION_INVALID == norm_session)
|
||||||
{
|
{
|
||||||
@@ -129,7 +129,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
NormSetMulticastInterface(norm_session, ifacePtr);
|
NormSetMulticastInterface(norm_session, ifacePtr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recv)
|
if (recv)
|
||||||
{
|
{
|
||||||
// The alternative NORM_SYNC_CURRENT here would provide "instant"
|
// The alternative NORM_SYNC_CURRENT here would provide "instant"
|
||||||
@@ -148,7 +148,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
}
|
}
|
||||||
is_receiver = true;
|
is_receiver = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (send)
|
if (send)
|
||||||
{
|
{
|
||||||
// Pick a random sender instance id (aka norm sender session id)
|
// Pick a random sender instance id (aka norm sender session id)
|
||||||
@@ -163,10 +163,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
norm_instance = NORM_INSTANCE_INVALID;
|
norm_instance = NORM_INSTANCE_INVALID;
|
||||||
errno = savedErrno;
|
errno = savedErrno;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
NormSetCongestionControl(norm_session, true);
|
NormSetCongestionControl(norm_session, true);
|
||||||
norm_tx_ready = true;
|
norm_tx_ready = true;
|
||||||
is_sender = true;
|
is_sender = true;
|
||||||
if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
|
if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
|
||||||
{
|
{
|
||||||
// errno set by whatever failed
|
// errno set by whatever failed
|
||||||
@@ -178,11 +178,11 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//NormSetMessageTrace(norm_session, true);
|
//NormSetMessageTrace(norm_session, true);
|
||||||
//NormSetDebugLevel(3);
|
//NormSetDebugLevel(3);
|
||||||
//NormOpenDebugLog(norm_instance, "normLog.txt");
|
//NormOpenDebugLog(norm_instance, "normLog.txt");
|
||||||
|
|
||||||
return 0; // no error
|
return 0; // no error
|
||||||
} // end zmq::norm_engine_t::init()
|
} // end zmq::norm_engine_t::init()
|
||||||
|
|
||||||
@@ -192,12 +192,12 @@ void zmq::norm_engine_t::shutdown()
|
|||||||
if (is_receiver)
|
if (is_receiver)
|
||||||
{
|
{
|
||||||
NormStopReceiver(norm_session);
|
NormStopReceiver(norm_session);
|
||||||
|
|
||||||
// delete any active NormRxStreamState
|
// delete any active NormRxStreamState
|
||||||
rx_pending_list.Destroy();
|
rx_pending_list.Destroy();
|
||||||
rx_ready_list.Destroy();
|
rx_ready_list.Destroy();
|
||||||
msg_ready_list.Destroy();
|
msg_ready_list.Destroy();
|
||||||
|
|
||||||
is_receiver = false;
|
is_receiver = false;
|
||||||
}
|
}
|
||||||
if (is_sender)
|
if (is_sender)
|
||||||
@@ -224,20 +224,20 @@ void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_
|
|||||||
zmq_session = session_;
|
zmq_session = session_;
|
||||||
if (is_sender) zmq_output_ready = true;
|
if (is_sender) zmq_output_ready = true;
|
||||||
if (is_receiver) zmq_input_ready = true;
|
if (is_receiver) zmq_input_ready = true;
|
||||||
|
|
||||||
fd_t normDescriptor = NormGetDescriptor(norm_instance);
|
fd_t normDescriptor = NormGetDescriptor(norm_instance);
|
||||||
norm_descriptor_handle = add_fd(normDescriptor);
|
norm_descriptor_handle = add_fd(normDescriptor);
|
||||||
// Set POLLIN for notification of pending NormEvents
|
// Set POLLIN for notification of pending NormEvents
|
||||||
set_pollin(norm_descriptor_handle);
|
set_pollin(norm_descriptor_handle);
|
||||||
|
|
||||||
if (is_sender) send_data();
|
if (is_sender) send_data();
|
||||||
|
|
||||||
} // end zmq::norm_engine_t::init()
|
} // end zmq::norm_engine_t::init()
|
||||||
|
|
||||||
void zmq::norm_engine_t::unplug()
|
void zmq::norm_engine_t::unplug()
|
||||||
{
|
{
|
||||||
rm_fd(norm_descriptor_handle);
|
rm_fd(norm_descriptor_handle);
|
||||||
|
|
||||||
zmq_session = NULL;
|
zmq_session = NULL;
|
||||||
} // end zmq::norm_engine_t::unplug()
|
} // end zmq::norm_engine_t::unplug()
|
||||||
|
|
||||||
@@ -253,7 +253,7 @@ void zmq::norm_engine_t::restart_output()
|
|||||||
// There's new message data available from the session
|
// There's new message data available from the session
|
||||||
zmq_output_ready = true;
|
zmq_output_ready = true;
|
||||||
if (norm_tx_ready) send_data();
|
if (norm_tx_ready) send_data();
|
||||||
|
|
||||||
} // end zmq::norm_engine_t::restart_output()
|
} // end zmq::norm_engine_t::restart_output()
|
||||||
|
|
||||||
void zmq::norm_engine_t::send_data()
|
void zmq::norm_engine_t::send_data()
|
||||||
@@ -289,7 +289,7 @@ void zmq::norm_engine_t::send_data()
|
|||||||
// Need to pull and load a new message to send
|
// Need to pull and load a new message to send
|
||||||
if (-1 == zmq_session->pull_msg(&tx_msg))
|
if (-1 == zmq_session->pull_msg(&tx_msg))
|
||||||
{
|
{
|
||||||
// We need to wait for "restart_output()" to be called by ZMQ
|
// We need to wait for "restart_output()" to be called by ZMQ
|
||||||
zmq_output_ready = false;
|
zmq_output_ready = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -301,7 +301,7 @@ void zmq::norm_engine_t::send_data()
|
|||||||
// 'syncs' mid-stream. We key off the the state of the 'more_flag'
|
// 'syncs' mid-stream. We key off the the state of the 'more_flag'
|
||||||
// I.e.,If more_flag _was_ false previously, this is the first
|
// I.e.,If more_flag _was_ false previously, this is the first
|
||||||
// frame of a ZMQ message.
|
// frame of a ZMQ message.
|
||||||
if (tx_more_bit)
|
if (tx_more_bit)
|
||||||
tx_buffer[0] = (char)0xff; // this is not first frame of message
|
tx_buffer[0] = (char)0xff; // this is not first frame of message
|
||||||
else
|
else
|
||||||
tx_buffer[0] = 0x00; // this is first frame of message
|
tx_buffer[0] = 0x00; // this is first frame of message
|
||||||
@@ -339,7 +339,7 @@ void zmq::norm_engine_t::in_event()
|
|||||||
zmq_assert(false);
|
zmq_assert(false);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch(event.type)
|
switch(event.type)
|
||||||
{
|
{
|
||||||
case NORM_TX_QUEUE_VACANCY:
|
case NORM_TX_QUEUE_VACANCY:
|
||||||
@@ -350,13 +350,13 @@ void zmq::norm_engine_t::in_event()
|
|||||||
send_data();
|
send_data();
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case NORM_RX_OBJECT_NEW:
|
case NORM_RX_OBJECT_NEW:
|
||||||
//break;
|
//break;
|
||||||
case NORM_RX_OBJECT_UPDATED:
|
case NORM_RX_OBJECT_UPDATED:
|
||||||
recv_data(event.object);
|
recv_data(event.object);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case NORM_RX_OBJECT_ABORTED:
|
case NORM_RX_OBJECT_ABORTED:
|
||||||
{
|
{
|
||||||
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
|
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
|
||||||
@@ -370,7 +370,7 @@ void zmq::norm_engine_t::in_event()
|
|||||||
}
|
}
|
||||||
delete rxState;
|
delete rxState;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case NORM_REMOTE_SENDER_INACTIVE:
|
case NORM_REMOTE_SENDER_INACTIVE:
|
||||||
// Here we free resources used for this formerly active sender.
|
// Here we free resources used for this formerly active sender.
|
||||||
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
|
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
|
||||||
@@ -380,11 +380,11 @@ void zmq::norm_engine_t::in_event()
|
|||||||
// user configurable timeout here to wait some amount of time
|
// user configurable timeout here to wait some amount of time
|
||||||
// after this event to declare the remote sender truly dead
|
// after this event to declare the remote sender truly dead
|
||||||
// and delete its state???
|
// and delete its state???
|
||||||
NormNodeDelete(event.sender);
|
NormNodeDelete(event.sender);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// We ignore some NORM events
|
// We ignore some NORM events
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} // zmq::norm_engine_t::in_event()
|
} // zmq::norm_engine_t::in_event()
|
||||||
@@ -396,7 +396,7 @@ void zmq::norm_engine_t::restart_input()
|
|||||||
// Process any pending received messages
|
// Process any pending received messages
|
||||||
if (!msg_ready_list.IsEmpty())
|
if (!msg_ready_list.IsEmpty())
|
||||||
recv_data(NORM_OBJECT_INVALID);
|
recv_data(NORM_OBJECT_INVALID);
|
||||||
|
|
||||||
} // end zmq::norm_engine_t::restart_input()
|
} // end zmq::norm_engine_t::restart_input()
|
||||||
|
|
||||||
void zmq::norm_engine_t::recv_data(NormObjectHandle object)
|
void zmq::norm_engine_t::recv_data(NormObjectHandle object)
|
||||||
@@ -447,19 +447,19 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
|
|||||||
{
|
{
|
||||||
switch(rxState->Decode())
|
switch(rxState->Decode())
|
||||||
{
|
{
|
||||||
case 1: // msg completed
|
case 1: // msg completed
|
||||||
// Complete message decoded, move this stream to msg_ready_list
|
// Complete message decoded, move this stream to msg_ready_list
|
||||||
// to push the message up to the session below. Note the stream
|
// to push the message up to the session below. Note the stream
|
||||||
// will be returned to the "rx_ready_list" after that's done
|
// will be returned to the "rx_ready_list" after that's done
|
||||||
rx_ready_list.Remove(*rxState);
|
rx_ready_list.Remove(*rxState);
|
||||||
msg_ready_list.Append(*rxState);
|
msg_ready_list.Append(*rxState);
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
case -1: // decoding error (shouldn't happen w/ NORM, but ...)
|
case -1: // decoding error (shouldn't happen w/ NORM, but ...)
|
||||||
// We need to re-sync this stream (decoder buffer was reset)
|
// We need to re-sync this stream (decoder buffer was reset)
|
||||||
rxState->SetSync(false);
|
rxState->SetSync(false);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default: // 0 - need more data
|
default: // 0 - need more data
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -524,12 +524,12 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
|
|||||||
rx_pending_list.Append(*rxState);
|
rx_pending_list.Append(*rxState);
|
||||||
}
|
}
|
||||||
} // end while(NULL != (rxState = iterator.GetNextItem()))
|
} // end while(NULL != (rxState = iterator.GetNextItem()))
|
||||||
|
|
||||||
if (zmq_input_ready)
|
if (zmq_input_ready)
|
||||||
{
|
{
|
||||||
// At this point, we've made a pass through the "rx_ready" stream list
|
// At this point, we've made a pass through the "rx_ready" stream list
|
||||||
// Now make a pass through the "msg_pending" list (if the zmq session
|
// Now make a pass through the "msg_pending" list (if the zmq session
|
||||||
// ready for more input). This may possibly return streams back to
|
// ready for more input). This may possibly return streams back to
|
||||||
// the "rx ready" stream list after their pending message is handled
|
// the "rx ready" stream list after their pending message is handled
|
||||||
NormRxStreamState::List::Iterator iterator(msg_ready_list);
|
NormRxStreamState::List::Iterator iterator(msg_ready_list);
|
||||||
NormRxStreamState* rxState;
|
NormRxStreamState* rxState;
|
||||||
@@ -549,7 +549,7 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
|
|||||||
{
|
{
|
||||||
// session rejected message?
|
// session rejected message?
|
||||||
// TBD - handle this better
|
// TBD - handle this better
|
||||||
zmq_assert(false);
|
zmq_assert(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// else message was accepted.
|
// else message was accepted.
|
||||||
@@ -561,15 +561,15 @@ void zmq::norm_engine_t::recv_data(NormObjectHandle object)
|
|||||||
} // end while(NULL != (rxState = iterator.GetNextItem()))
|
} // end while(NULL != (rxState = iterator.GetNextItem()))
|
||||||
} // end if (zmq_input_ready)
|
} // end if (zmq_input_ready)
|
||||||
} // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
|
} // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
|
||||||
|
|
||||||
// Alert zmq of the messages we have pushed up
|
// Alert zmq of the messages we have pushed up
|
||||||
zmq_session->flush();
|
zmq_session->flush();
|
||||||
|
|
||||||
} // end zmq::norm_engine_t::recv_data()
|
} // end zmq::norm_engine_t::recv_data()
|
||||||
|
|
||||||
zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream,
|
zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream,
|
||||||
int64_t maxMsgSize)
|
int64_t maxMsgSize)
|
||||||
: norm_stream(normStream), max_msg_size(maxMsgSize),
|
: norm_stream(normStream), max_msg_size(maxMsgSize),
|
||||||
in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false),
|
in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false),
|
||||||
buffer_ptr(NULL), buffer_size(0), buffer_count(0),
|
buffer_ptr(NULL), buffer_size(0), buffer_count(0),
|
||||||
prev(NULL), next(NULL), list(NULL)
|
prev(NULL), next(NULL), list(NULL)
|
||||||
@@ -620,17 +620,17 @@ int zmq::norm_engine_t::NormRxStreamState::Decode()
|
|||||||
{
|
{
|
||||||
// There's pending data for the decoder to decode
|
// There's pending data for the decoder to decode
|
||||||
size_t processed = 0;
|
size_t processed = 0;
|
||||||
|
|
||||||
// This a bit of a kludgy approach used to weed
|
// This a bit of a kludgy approach used to weed
|
||||||
// out the NORM ZMQ message transport "syncFlag" byte
|
// out the NORM ZMQ message transport "syncFlag" byte
|
||||||
// from the ZMQ message stream being decoded (but it works!)
|
// from the ZMQ message stream being decoded (but it works!)
|
||||||
if (skip_norm_sync)
|
if (skip_norm_sync)
|
||||||
{
|
{
|
||||||
buffer_ptr++;
|
buffer_ptr++;
|
||||||
buffer_count--;
|
buffer_count--;
|
||||||
skip_norm_sync = false;
|
skip_norm_sync = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed);
|
int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed);
|
||||||
buffer_ptr += processed;
|
buffer_ptr += processed;
|
||||||
buffer_count -= processed;
|
buffer_count -= processed;
|
||||||
@@ -651,7 +651,7 @@ int zmq::norm_engine_t::NormRxStreamState::Decode()
|
|||||||
skip_norm_sync = false; // will get consumed by norm sync check
|
skip_norm_sync = false; // will get consumed by norm sync check
|
||||||
Init();
|
Init();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case 0:
|
case 0:
|
||||||
// need more data, keep decoding until buffer exhausted
|
// need more data, keep decoding until buffer exhausted
|
||||||
break;
|
break;
|
||||||
@@ -662,7 +662,7 @@ int zmq::norm_engine_t::NormRxStreamState::Decode()
|
|||||||
buffer_size = 0;
|
buffer_size = 0;
|
||||||
zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
|
zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
|
||||||
return 0; // need more data
|
return 0; // need more data
|
||||||
|
|
||||||
} // end zmq::norm_engine_t::NormRxStreamState::Decode()
|
} // end zmq::norm_engine_t::NormRxStreamState::Decode()
|
||||||
|
|
||||||
zmq::norm_engine_t::NormRxStreamState::List::List()
|
zmq::norm_engine_t::NormRxStreamState::List::List()
|
||||||
@@ -723,6 +723,6 @@ zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::Li
|
|||||||
if (NULL != nextItem) next_item = nextItem->next;
|
if (NULL != nextItem) next_item = nextItem->next;
|
||||||
return nextItem;
|
return nextItem;
|
||||||
} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
|
} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
|
||||||
|
|
||||||
|
|
||||||
#endif // ZMQ_HAVE_NORM
|
#endif // ZMQ_HAVE_NORM
|
||||||
|
|||||||
@@ -16,17 +16,17 @@ namespace zmq
|
|||||||
{
|
{
|
||||||
class io_thread_t;
|
class io_thread_t;
|
||||||
class session_base_t;
|
class session_base_t;
|
||||||
|
|
||||||
class norm_engine_t : public io_object_t, public i_engine
|
class norm_engine_t : public io_object_t, public i_engine
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
|
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
|
||||||
~norm_engine_t ();
|
~norm_engine_t ();
|
||||||
|
|
||||||
// create NORM instance, session, etc
|
// create NORM instance, session, etc
|
||||||
int init(const char* network_, bool send, bool recv);
|
int init(const char* network_, bool send, bool recv);
|
||||||
void shutdown();
|
void shutdown();
|
||||||
|
|
||||||
// i_engine interface implementation.
|
// i_engine interface implementation.
|
||||||
// Plug the engine to the session.
|
// Plug the engine to the session.
|
||||||
virtual void plug (zmq::io_thread_t *io_thread_,
|
virtual void plug (zmq::io_thread_t *io_thread_,
|
||||||
@@ -45,43 +45,43 @@ namespace zmq
|
|||||||
virtual void restart_output ();
|
virtual void restart_output ();
|
||||||
|
|
||||||
virtual void zap_msg_available () {};
|
virtual void zap_msg_available () {};
|
||||||
|
|
||||||
// i_poll_events interface implementation.
|
// i_poll_events interface implementation.
|
||||||
// (we only need in_event() for NormEvent notification)
|
// (we only need in_event() for NormEvent notification)
|
||||||
// (i.e., don't have any output events or timers (yet))
|
// (i.e., don't have any output events or timers (yet))
|
||||||
void in_event ();
|
void in_event ();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void unplug();
|
void unplug();
|
||||||
void send_data();
|
void send_data();
|
||||||
void recv_data(NormObjectHandle stream);
|
void recv_data(NormObjectHandle stream);
|
||||||
|
|
||||||
|
|
||||||
enum {BUFFER_SIZE = 2048};
|
enum {BUFFER_SIZE = 2048};
|
||||||
|
|
||||||
// Used to keep track of streams from multiple senders
|
// Used to keep track of streams from multiple senders
|
||||||
class NormRxStreamState
|
class NormRxStreamState
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
NormRxStreamState(NormObjectHandle normStream,
|
NormRxStreamState(NormObjectHandle normStream,
|
||||||
int64_t maxMsgSize);
|
int64_t maxMsgSize);
|
||||||
~NormRxStreamState();
|
~NormRxStreamState();
|
||||||
|
|
||||||
NormObjectHandle GetStreamHandle() const
|
NormObjectHandle GetStreamHandle() const
|
||||||
{return norm_stream;}
|
{return norm_stream;}
|
||||||
|
|
||||||
bool Init();
|
bool Init();
|
||||||
|
|
||||||
void SetRxReady(bool state)
|
void SetRxReady(bool state)
|
||||||
{rx_ready = state;}
|
{rx_ready = state;}
|
||||||
bool IsRxReady() const
|
bool IsRxReady() const
|
||||||
{return rx_ready;}
|
{return rx_ready;}
|
||||||
|
|
||||||
void SetSync(bool state)
|
void SetSync(bool state)
|
||||||
{in_sync = state;}
|
{in_sync = state;}
|
||||||
bool InSync() const
|
bool InSync() const
|
||||||
{return in_sync;}
|
{return in_sync;}
|
||||||
|
|
||||||
// These are used to feed data to decoder
|
// These are used to feed data to decoder
|
||||||
// and its underlying "msg" buffer
|
// and its underlying "msg" buffer
|
||||||
char* AccessBuffer()
|
char* AccessBuffer()
|
||||||
@@ -98,21 +98,21 @@ namespace zmq
|
|||||||
// occurs the 'sync' is dropped and the
|
// occurs the 'sync' is dropped and the
|
||||||
// decoder re-initialized
|
// decoder re-initialized
|
||||||
int Decode();
|
int Decode();
|
||||||
|
|
||||||
class List
|
class List
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
List();
|
List();
|
||||||
~List();
|
~List();
|
||||||
|
|
||||||
void Append(NormRxStreamState& item);
|
void Append(NormRxStreamState& item);
|
||||||
void Remove(NormRxStreamState& item);
|
void Remove(NormRxStreamState& item);
|
||||||
|
|
||||||
bool IsEmpty() const
|
bool IsEmpty() const
|
||||||
{return (NULL == head);}
|
{return (NULL == head);}
|
||||||
|
|
||||||
void Destroy();
|
void Destroy();
|
||||||
|
|
||||||
class Iterator
|
class Iterator
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -122,36 +122,36 @@ namespace zmq
|
|||||||
NormRxStreamState* next_item;
|
NormRxStreamState* next_item;
|
||||||
};
|
};
|
||||||
friend class Iterator;
|
friend class Iterator;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
NormRxStreamState* head;
|
NormRxStreamState* head;
|
||||||
NormRxStreamState* tail;
|
NormRxStreamState* tail;
|
||||||
|
|
||||||
}; // end class zmq::norm_engine_t::NormRxStreamState::List
|
}; // end class zmq::norm_engine_t::NormRxStreamState::List
|
||||||
|
|
||||||
friend class List;
|
friend class List;
|
||||||
|
|
||||||
List* AccessList()
|
List* AccessList()
|
||||||
{return list;}
|
{return list;}
|
||||||
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
NormObjectHandle norm_stream;
|
NormObjectHandle norm_stream;
|
||||||
int64_t max_msg_size;
|
int64_t max_msg_size;
|
||||||
bool in_sync;
|
bool in_sync;
|
||||||
bool rx_ready;
|
bool rx_ready;
|
||||||
v2_decoder_t* zmq_decoder;
|
v2_decoder_t* zmq_decoder;
|
||||||
bool skip_norm_sync;
|
bool skip_norm_sync;
|
||||||
unsigned char* buffer_ptr;
|
unsigned char* buffer_ptr;
|
||||||
size_t buffer_size;
|
size_t buffer_size;
|
||||||
size_t buffer_count;
|
size_t buffer_count;
|
||||||
|
|
||||||
NormRxStreamState* prev;
|
NormRxStreamState* prev;
|
||||||
NormRxStreamState* next;
|
NormRxStreamState* next;
|
||||||
NormRxStreamState::List* list;
|
NormRxStreamState::List* list;
|
||||||
|
|
||||||
}; // end class zmq::norm_engine_t::NormRxStreamState
|
}; // end class zmq::norm_engine_t::NormRxStreamState
|
||||||
|
|
||||||
session_base_t* zmq_session;
|
session_base_t* zmq_session;
|
||||||
options_t options;
|
options_t options;
|
||||||
NormInstanceHandle norm_instance;
|
NormInstanceHandle norm_instance;
|
||||||
@@ -161,25 +161,25 @@ namespace zmq
|
|||||||
bool is_receiver;
|
bool is_receiver;
|
||||||
// Sender state
|
// Sender state
|
||||||
msg_t tx_msg;
|
msg_t tx_msg;
|
||||||
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
|
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
|
||||||
NormObjectHandle norm_tx_stream;
|
NormObjectHandle norm_tx_stream;
|
||||||
bool tx_first_msg;
|
bool tx_first_msg;
|
||||||
bool tx_more_bit;
|
bool tx_more_bit;
|
||||||
bool zmq_output_ready; // zmq has msg(s) to send
|
bool zmq_output_ready; // zmq has msg(s) to send
|
||||||
bool norm_tx_ready; // norm has tx queue vacancy
|
bool norm_tx_ready; // norm has tx queue vacancy
|
||||||
// TBD - maybe don't need buffer if can access zmq message buffer directly?
|
// TBD - maybe don't need buffer if can access zmq message buffer directly?
|
||||||
char tx_buffer[BUFFER_SIZE];
|
char tx_buffer[BUFFER_SIZE];
|
||||||
unsigned int tx_index;
|
unsigned int tx_index;
|
||||||
unsigned int tx_len;
|
unsigned int tx_len;
|
||||||
|
|
||||||
// Receiver state
|
// Receiver state
|
||||||
// Lists of norm rx streams from remote senders
|
// Lists of norm rx streams from remote senders
|
||||||
bool zmq_input_ready; // zmq ready to receive msg(s)
|
bool zmq_input_ready; // zmq ready to receive msg(s)
|
||||||
NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
|
NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception
|
||||||
NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead()
|
NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead()
|
||||||
NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq
|
NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq
|
||||||
|
|
||||||
|
|
||||||
}; // end class norm_engine_t
|
}; // end class norm_engine_t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user