mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-10-26 18:42:43 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			729 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			729 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| 
 | |
| #include "platform.hpp"
 | |
| 
 | |
| #if defined ZMQ_HAVE_NORM
 | |
| 
 | |
| #include "norm_engine.hpp"
 | |
| #include "session_base.hpp"
 | |
| #include "v2_protocol.hpp"
 | |
| 
 | |
| zmq::norm_engine_t::norm_engine_t(io_thread_t*     parent_,
 | |
|                                   const options_t& options_)
 | |
|  : io_object_t(parent_), zmq_session(NULL), options(options_),  
 | |
|    norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), 
 | |
|    is_sender(false), is_receiver(false),
 | |
|    zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID), 
 | |
|    tx_first_msg(true), tx_more_bit(false), 
 | |
|    zmq_output_ready(false), norm_tx_ready(false), 
 | |
|    tx_index(0), tx_len(0),
 | |
|    zmq_input_ready(false)  
 | |
| {
 | |
|     int rc = tx_msg.init();
 | |
|     errno_assert(0 == rc);
 | |
| }
 | |
| 
 | |
| zmq::norm_engine_t::~norm_engine_t()
 | |
| {
 | |
|     shutdown();  // in case it was not already called
 | |
| }
 | |
| 
 | |
| 
 | |
| int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
 | |
| {
 | |
|     // Parse the "network_" address int "iface", "addr", and "port"
 | |
|     // norm endpoint format: [id,][<iface>;]<addr>:<port>
 | |
|     // First, look for optional local NormNodeId
 | |
|     // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
 | |
|     NormNodeId localId = NORM_NODE_ANY;
 | |
|     const char* ifacePtr = strchr(network_, ',');
 | |
|     if (NULL != ifacePtr)
 | |
|     {
 | |
|         size_t idLen = ifacePtr - network_;
 | |
|         if (idLen > 31) idLen = 31;
 | |
|         char idText[32];
 | |
|         strncpy(idText, network_, idLen);
 | |
|         idText[idLen] = '\0';
 | |
|         localId = (NormNodeId)atoi(idText);
 | |
|         ifacePtr++;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         ifacePtr = network_;
 | |
|     }
 | |
|     
 | |
|     // Second, look for optional multicast ifaceName
 | |
|     char ifaceName[256];
 | |
|     const char* addrPtr = strchr(ifacePtr, ';');
 | |
|     if (NULL != addrPtr)
 | |
|     {
 | |
|         size_t ifaceLen = addrPtr - ifacePtr;
 | |
|         if (ifaceLen > 255) ifaceLen = 255;  // return error instead?
 | |
|         strncpy(ifaceName, ifacePtr, ifaceLen);
 | |
|         ifaceName[ifaceLen] = '\0';
 | |
|         ifacePtr = ifaceName;
 | |
|         addrPtr++;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         addrPtr = ifacePtr;
 | |
|         ifacePtr = NULL;
 | |
|     }
 | |
|     
 | |
|     // Finally, parse IP address and port number
 | |
|     const char* portPtr = strrchr(addrPtr, ':');
 | |
|     if (NULL == portPtr)
 | |
|     {
 | |
|         errno = EINVAL;
 | |
|         return -1;
 | |
|     }
 | |
|     
 | |
|     char addr[256];
 | |
|     size_t addrLen = portPtr - addrPtr;
 | |
|     if (addrLen > 255) addrLen = 255;
 | |
|     strncpy(addr, addrPtr, addrLen);
 | |
|     addr[addrLen] = '\0';
 | |
|     portPtr++;
 | |
|     unsigned short portNumber = atoi(portPtr);
 | |
|     
 | |
|     if (NORM_INSTANCE_INVALID == norm_instance)
 | |
|     {
 | |
|         if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance()))
 | |
|         {
 | |
|             // errno set by whatever caused NormCreateInstance() to fail
 | |
|             return -1;
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     // TBD - What do we use for our local NormNodeId?
 | |
|     //       (for now we use automatic, IP addr based assignment or passed in 'id')
 | |
|     //       a) Use ZMQ Identity somehow?
 | |
|     //       b) Add function to use iface addr
 | |
|     //       c) Randomize and implement a NORM session layer
 | |
|     //          conflict detection/resolution protocol
 | |
|     
 | |
|     norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
 | |
|     if (NORM_SESSION_INVALID == norm_session)
 | |
|     {
 | |
|         int savedErrno = errno;
 | |
|         NormDestroyInstance(norm_instance);
 | |
|         norm_instance = NORM_INSTANCE_INVALID;
 | |
|         errno = savedErrno;
 | |
|         return -1;
 | |
|     }
 | |
|     // There's many other useful NORM options that could be applied here
 | |
|     if (NormIsUnicastAddress(addr))
 | |
|     {
 | |
|         NormSetDefaultUnicastNack(norm_session, true);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         // These only apply for multicast sessions
 | |
|         //NormSetTTL(norm_session, options.multicast_hops);  // ZMQ default is 1
 | |
|         NormSetTTL(norm_session, 255);  // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported
 | |
|         NormSetRxPortReuse(norm_session, true);  // port reuse doesn't work for non-connected unicast
 | |
|         NormSetLoopback(norm_session, true);  // needed when multicast users on same machine
 | |
|         if (NULL != ifacePtr)
 | |
|         {
 | |
|             // Note a bad interface may not be caught until sender or receiver start
 | |
|             // (Since sender/receiver is not yet started, this always succeeds here)
 | |
|             NormSetMulticastInterface(norm_session, ifacePtr);
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     if (recv)
 | |
|     {
 | |
|         // The alternative NORM_SYNC_CURRENT here would provide "instant"
 | |
|         // receiver sync to the sender's _current_ message transmission.
 | |
|         // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
 | |
|         NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
 | |
|         if (!NormStartReceiver(norm_session, 2*1024*1024))
 | |
|         {
 | |
|             // errno set by whatever failed
 | |
|             int savedErrno = errno;
 | |
|             NormDestroyInstance(norm_instance); // session gets closed, too
 | |
|             norm_session = NORM_SESSION_INVALID;
 | |
|             norm_instance = NORM_INSTANCE_INVALID;
 | |
|             errno = savedErrno;
 | |
|             return -1;
 | |
|         }
 | |
|         is_receiver = true;
 | |
|     }
 | |
|     
 | |
|     if (send)
 | |
|     {
 | |
|         // Pick a random sender instance id (aka norm sender session id)
 | |
|         NormSessionId instanceId = NormGetRandomSessionId();
 | |
|         // TBD - provide "options" for some NORM sender parameters
 | |
|         if (!NormStartSender(norm_session, instanceId, 2*1024*1024, 1400, 16, 4))
 | |
|         {
 | |
|             // errno set by whatever failed
 | |
|             int savedErrno = errno;
 | |
|             NormDestroyInstance(norm_instance); // session gets closed, too
 | |
|             norm_session = NORM_SESSION_INVALID;
 | |
|             norm_instance = NORM_INSTANCE_INVALID;
 | |
|             errno = savedErrno;
 | |
|             return -1;
 | |
|         }    
 | |
|         NormSetCongestionControl(norm_session, true);
 | |
|         norm_tx_ready = true;
 | |
|         is_sender = true;   
 | |
|         if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024)))
 | |
|         {
 | |
|             // errno set by whatever failed
 | |
|             int savedErrno = errno;
 | |
|             NormDestroyInstance(norm_instance); // session gets closed, too
 | |
|             norm_session = NORM_SESSION_INVALID;
 | |
|             norm_instance = NORM_INSTANCE_INVALID;
 | |
|             errno = savedErrno;
 | |
|             return -1;
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     //NormSetMessageTrace(norm_session, true);
 | |
|     //NormSetDebugLevel(3);
 | |
|     //NormOpenDebugLog(norm_instance, "normLog.txt");
 | |
|     
 | |
|     return 0;  // no error
 | |
| }  // end zmq::norm_engine_t::init()
 | |
| 
 | |
| void zmq::norm_engine_t::shutdown()
 | |
| {
 | |
|     // TBD - implement a more graceful shutdown option
 | |
|     if (is_receiver)
 | |
|     {
 | |
|         NormStopReceiver(norm_session);
 | |
|         
 | |
|         // delete any active NormRxStreamState
 | |
|         rx_pending_list.Destroy();
 | |
|         rx_ready_list.Destroy();
 | |
|         msg_ready_list.Destroy();
 | |
|         
 | |
|         is_receiver = false;
 | |
|     }
 | |
|     if (is_sender)
 | |
|     {
 | |
|         NormStopSender(norm_session);
 | |
|         is_sender = false;
 | |
|     }
 | |
|     if (NORM_SESSION_INVALID != norm_session)
 | |
|     {
 | |
|         NormDestroySession(norm_session);
 | |
|         norm_session = NORM_SESSION_INVALID;
 | |
|     }
 | |
|     if (NORM_INSTANCE_INVALID != norm_instance)
 | |
|     {
 | |
|         NormStopInstance(norm_instance);
 | |
|         NormDestroyInstance(norm_instance);
 | |
|         norm_instance = NORM_INSTANCE_INVALID;
 | |
|     }
 | |
| }  // end zmq::norm_engine_t::shutdown()
 | |
| 
 | |
| void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_)
 | |
| {
 | |
|     // TBD - we may assign the NORM engine to an io_thread in the future???
 | |
|     zmq_session = session_;
 | |
|     if (is_sender) zmq_output_ready = true;
 | |
|     if (is_receiver) zmq_input_ready = true;
 | |
|     
 | |
|     fd_t normDescriptor = NormGetDescriptor(norm_instance);
 | |
|     norm_descriptor_handle = add_fd(normDescriptor);
 | |
|     // Set POLLIN for notification of pending NormEvents
 | |
|     set_pollin(norm_descriptor_handle); 
 | |
|     
 | |
|     if (is_sender) send_data();
 | |
|     
 | |
| }  // end zmq::norm_engine_t::init()
 | |
| 
 | |
| void zmq::norm_engine_t::unplug()
 | |
| {
 | |
|     rm_fd(norm_descriptor_handle);
 | |
|     
 | |
|     zmq_session = NULL;
 | |
| }  // end zmq::norm_engine_t::unplug()
 | |
| 
 | |
| void zmq::norm_engine_t::terminate()
 | |
| {
 | |
|     unplug();
 | |
|     shutdown();
 | |
|     delete this;
 | |
| }
 | |
| 
 | |
| void zmq::norm_engine_t::restart_output()
 | |
| {
 | |
|     // There's new message data available from the session
 | |
|     zmq_output_ready = true;
 | |
|     if (norm_tx_ready) send_data();
 | |
|     
 | |
| }  // end zmq::norm_engine_t::restart_output()
 | |
| 
 | |
| void zmq::norm_engine_t::send_data()
 | |
| {
 | |
|     // Here we write as much as is available or we can
 | |
|     while (zmq_output_ready && norm_tx_ready)
 | |
|     {
 | |
|         if (0 == tx_len)
 | |
|         {
 | |
|             // Our tx_buffer needs data to send
 | |
|             // Get more data from encoder
 | |
|             size_t space = BUFFER_SIZE;
 | |
|             unsigned char* bufPtr = (unsigned char*)tx_buffer;
 | |
|             tx_len = zmq_encoder.encode(&bufPtr, space);
 | |
|             if (0 == tx_len)
 | |
|             {
 | |
|                 if (tx_first_msg)
 | |
|                 {
 | |
|                     // We don't need to mark eom/flush until a message is sent
 | |
|                     tx_first_msg = false;
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     // A prior message was completely written to stream, so
 | |
|                     // mark end-of-message and possibly flush (to force packet transmission,
 | |
|                     // even if it's not a full segment so message gets delivered quickly)
 | |
|                     // NormStreamMarkEom(norm_tx_stream);  // the flush below marks eom
 | |
|                     // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
 | |
|                     // but makes sure content is delivered quickly.  Positive acknowledgements
 | |
|                     // with flush override would make NORM more succinct here
 | |
|                     NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
 | |
|                 }
 | |
|                 // Need to pull and load a new message to send
 | |
|                 if (-1 == zmq_session->pull_msg(&tx_msg))
 | |
|                 {
 | |
|                     // We need to wait for "restart_output()" to be called by ZMQ 
 | |
|                     zmq_output_ready = false;
 | |
|                     break;
 | |
|                 }
 | |
|                 zmq_encoder.load_msg(&tx_msg);
 | |
|                 // Should we write message size header for NORM to use? Or expect NORM
 | |
|                 // receiver to decode ZMQ message framing format(s)?
 | |
|                 // OK - we need to use a byte to denote when the ZMQ frame is the _first_
 | |
|                 //      frame of a message so it can be decoded properly when a receiver
 | |
|                 //      'syncs' mid-stream.  We key off the the state of the 'more_flag'
 | |
|                 //      I.e.,If  more_flag _was_ false previously, this is the first
 | |
|                 //      frame of a ZMQ message.
 | |
|                 if (tx_more_bit) 
 | |
|                     tx_buffer[0] = (char)0xff;  // this is not first frame of message
 | |
|                 else
 | |
|                     tx_buffer[0] = 0x00;  // this is first frame of message
 | |
|                 tx_more_bit = (0 != (tx_msg.flags() & msg_t::more));
 | |
|                 // Go ahead an get a first chunk of the message
 | |
|                 bufPtr++;
 | |
|                 space--;
 | |
|                 tx_len = 1 + zmq_encoder.encode(&bufPtr, space);
 | |
|                 tx_index = 0;
 | |
|             }
 | |
|         }
 | |
|         // Do we have data in our tx_buffer pending
 | |
|         if (tx_index < tx_len)
 | |
|         {
 | |
|             // We have data in our tx_buffer to send, so write it to the stream
 | |
|             tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index);
 | |
|             if (tx_index < tx_len)
 | |
|             {
 | |
|                 // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
 | |
|                 norm_tx_ready = false;
 | |
|                 break;
 | |
|             }
 | |
|             tx_len = 0;  // all buffered data was written
 | |
|         }
 | |
|     }  // end while (zmq_output_ready && norm_tx_ready)
 | |
| }  // end zmq::norm_engine_t::send_data()
 | |
| 
 | |
| void zmq::norm_engine_t::in_event()
 | |
| {
 | |
|     // This means a NormEvent is pending, so call NormGetNextEvent() and handle
 | |
|     NormEvent event;
 | |
|     if (!NormGetNextEvent(norm_instance, &event))
 | |
|     {
 | |
|         // NORM has died before we unplugged?!
 | |
|         zmq_assert(false);
 | |
|         return;
 | |
|     }
 | |
|     
 | |
|     switch(event.type)
 | |
|     {
 | |
|         case NORM_TX_QUEUE_VACANCY:
 | |
|         case NORM_TX_QUEUE_EMPTY:
 | |
|             if (!norm_tx_ready)
 | |
|             {
 | |
|                 norm_tx_ready = true;
 | |
|                 send_data();
 | |
|             }
 | |
|             break;
 | |
|             
 | |
|         case NORM_RX_OBJECT_NEW:
 | |
|             //break;
 | |
|         case NORM_RX_OBJECT_UPDATED:
 | |
|             recv_data(event.object);
 | |
|             break;
 | |
|             
 | |
|         case NORM_RX_OBJECT_ABORTED:
 | |
|         {
 | |
|             NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
 | |
|             if (NULL != rxState)
 | |
|             {
 | |
|                 // Remove the state from the list it's in
 | |
|                 // This is now unnecessary since deletion takes care of list removal
 | |
|                 // but in the interest of being clear ...
 | |
|                 NormRxStreamState::List* list = rxState->AccessList();
 | |
|                 if (NULL != list) list->Remove(*rxState);
 | |
|             }
 | |
|             delete rxState;
 | |
|             break;
 | |
|         }           
 | |
|         case NORM_REMOTE_SENDER_INACTIVE:
 | |
|             // Here we free resources used for this formerly active sender.
 | |
|             // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
 | |
|             //  get some messages delivered twice.  NORM_SYNC_CURRENT would
 | |
|             // mitigate that but might miss data at startup. Always tradeoffs.
 | |
|             // Instead of immediately deleting, we could instead initiate a
 | |
|             // user configurable timeout here to wait some amount of time
 | |
|             // after this event to declare the remote sender truly dead
 | |
|             // and delete its state???
 | |
|             NormNodeDelete(event.sender);  
 | |
|             break;
 | |
|             
 | |
|         default:
 | |
|             // We ignore some NORM events 
 | |
|             break;
 | |
|     }
 | |
| }  // zmq::norm_engine_t::in_event()
 | |
| 
 | |
| void zmq::norm_engine_t::restart_input()
 | |
| {
 | |
|     // TBD - should we check/assert that zmq_input_ready was false???
 | |
|     zmq_input_ready = true;
 | |
|     // Process any pending received messages
 | |
|     if (!msg_ready_list.IsEmpty())
 | |
|         recv_data(NORM_OBJECT_INVALID);
 | |
|     
 | |
| }  // end zmq::norm_engine_t::restart_input()
 | |
| 
 | |
| void zmq::norm_engine_t::recv_data(NormObjectHandle object)
 | |
| {
 | |
|     if (NORM_OBJECT_INVALID != object)
 | |
|     {
 | |
|         // Call result of NORM_RX_OBJECT_UPDATED notification
 | |
|         // This is a rx_ready indication for a new or existing rx stream
 | |
|         // First, determine if this is a stream we already know
 | |
|         zmq_assert(NORM_OBJECT_STREAM == NormObjectGetType(object));
 | |
|         // Since there can be multiple senders (publishers), we keep
 | |
|         // state for each separate rx stream.
 | |
|         NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(object);
 | |
|         if (NULL == rxState)
 | |
|         {
 | |
|             // This is a new stream, so create rxState with zmq decoder, etc
 | |
|             rxState = new NormRxStreamState(object, options.maxmsgsize);
 | |
|             if (!rxState->Init())
 | |
|             {
 | |
|                 errno_assert(false);
 | |
|                 delete rxState;
 | |
|                 return;
 | |
|             }
 | |
|             NormObjectSetUserData(object, rxState);
 | |
|         }
 | |
|         else if (!rxState->IsRxReady())
 | |
|         {
 | |
|             // Existing non-ready stream, so remove from pending
 | |
|             // list to be promoted to rx_ready_list ...
 | |
|             rx_pending_list.Remove(*rxState);
 | |
|         }
 | |
|         if (!rxState->IsRxReady())
 | |
|         {
 | |
|             // TBD - prepend up front for immediate service?
 | |
|             rxState->SetRxReady(true);
 | |
|             rx_ready_list.Append(*rxState);
 | |
|         }
 | |
|     }
 | |
|     // This loop repeats until we've read all data available from "rx ready" inbound streams
 | |
|     // and pushed any accumulated messages we can up to the zmq session.
 | |
|     while (!rx_ready_list.IsEmpty() || (zmq_input_ready && !msg_ready_list.IsEmpty()))
 | |
|     {
 | |
|         // Iterate through our rx_ready streams, reading data into the decoder
 | |
|         // (This services incoming "rx ready" streams in a round-robin fashion)
 | |
|         NormRxStreamState::List::Iterator iterator(rx_ready_list);
 | |
|         NormRxStreamState* rxState;
 | |
|         while (NULL != (rxState = iterator.GetNextItem()))
 | |
|         {
 | |
|             switch(rxState->Decode())
 | |
|             {
 | |
|                 case 1:  // msg completed   
 | |
|                     // Complete message decoded, move this stream to msg_ready_list
 | |
|                     // to push the message up to the session below.  Note the stream 
 | |
|                     // will be returned to the "rx_ready_list" after that's done
 | |
|                     rx_ready_list.Remove(*rxState);
 | |
|                     msg_ready_list.Append(*rxState);
 | |
|                     continue;
 | |
|                     
 | |
|                 case -1: // decoding error (shouldn't happen w/ NORM, but ...)
 | |
|                     // We need to re-sync this stream (decoder buffer was reset)
 | |
|                     rxState->SetSync(false);
 | |
|                     break;
 | |
|                     
 | |
|                 default:  // 0 - need more data
 | |
|                     break;
 | |
|             }
 | |
|             // Get more data from this stream
 | |
|             NormObjectHandle stream = rxState->GetStreamHandle();
 | |
|             // First, make sure we're in sync ...
 | |
|             while (!rxState->InSync())
 | |
|             {
 | |
|                 // seek NORM message start
 | |
|                 if (!NormStreamSeekMsgStart(stream))
 | |
|                 {
 | |
|                     // Need to wait for more data
 | |
|                     break;
 | |
|                 }
 | |
|                 // read message 'flag' byte to see if this it's a 'final' frame
 | |
|                 char syncFlag;
 | |
|                 unsigned int numBytes = 1;
 | |
|                 if (!NormStreamRead(stream, &syncFlag, &numBytes))
 | |
|                 {
 | |
|                     // broken stream (shouldn't happen after seek msg start?)
 | |
|                     zmq_assert(false);
 | |
|                     continue;
 | |
|                 }
 | |
|                 if (0 == numBytes)
 | |
|                 {
 | |
|                     // This probably shouldn't happen either since we found msg start
 | |
|                     // Need to wait for more data
 | |
|                     break;
 | |
|                 }
 | |
|                 if (0 == syncFlag) rxState->SetSync(true);
 | |
|                 // else keep seeking ...
 | |
|             }  // end while(!rxState->InSync())
 | |
|             if (!rxState->InSync())
 | |
|             {
 | |
|                 // Need more data for this stream, so remove from "rx ready"
 | |
|                 // list and iterate to next "rx ready" stream
 | |
|                 rxState->SetRxReady(false);
 | |
|                 // Move from rx_ready_list to rx_pending_list
 | |
|                 rx_ready_list.Remove(*rxState);
 | |
|                 rx_pending_list.Append(*rxState);
 | |
|                 continue;
 | |
|             }
 | |
|             // Now we're actually ready to read data from the NORM stream to the zmq_decoder
 | |
|             // the underlying zmq_decoder->get_buffer() call sets how much is needed.
 | |
|             unsigned int numBytes = rxState->GetBytesNeeded();
 | |
|             if (!NormStreamRead(stream, rxState->AccessBuffer(), &numBytes))
 | |
|             {
 | |
|                 // broken NORM stream, so re-sync
 | |
|                 rxState->Init();  // TBD - check result
 | |
|                 // This will retry syncing, and getting data from this stream
 | |
|                 // since we don't increment the "it" iterator
 | |
|                 continue;
 | |
|             }
 | |
|             rxState->IncrementBufferCount(numBytes);
 | |
|             if (0 == numBytes)
 | |
|             {
 | |
|                 // All the data available has been read
 | |
|                 // Need to wait for NORM_RX_OBJECT_UPDATED for this stream
 | |
|                 rxState->SetRxReady(false);
 | |
|                 // Move from rx_ready_list to rx_pending_list
 | |
|                 rx_ready_list.Remove(*rxState);
 | |
|                 rx_pending_list.Append(*rxState);
 | |
|             }
 | |
|         }  // end while(NULL != (rxState = iterator.GetNextItem()))
 | |
|         
 | |
|         if (zmq_input_ready)
 | |
|         {
 | |
|             // At this point, we've made a pass through the "rx_ready" stream list
 | |
|             // Now make a pass through the "msg_pending" list (if the zmq session 
 | |
|             // ready for more input).  This may possibly return streams back to 
 | |
|             // the "rx ready" stream list after their pending message is handled
 | |
|             NormRxStreamState::List::Iterator iterator(msg_ready_list);
 | |
|             NormRxStreamState* rxState;
 | |
|             while (NULL != (rxState = iterator.GetNextItem()))
 | |
|             {
 | |
|                 msg_t* msg = rxState->AccessMsg();
 | |
|                 int rc = zmq_session->push_msg(msg);
 | |
|                 if (-1 == rc)
 | |
|                 {
 | |
|                     if (EAGAIN == errno)
 | |
|                     {
 | |
|                         // need to wait until session calls "restart_input()"
 | |
|                         zmq_input_ready = false;
 | |
|                         break;
 | |
|                     }
 | |
|                     else
 | |
|                     {
 | |
|                         // session rejected message?
 | |
|                         // TBD - handle this better
 | |
|                         zmq_assert(false); 
 | |
|                     }
 | |
|                 }
 | |
|                 // else message was accepted.
 | |
|                 msg_ready_list.Remove(*rxState);
 | |
|                 if (rxState->IsRxReady())  // Move back to "rx_ready" list to read more data
 | |
|                     rx_ready_list.Append(*rxState);
 | |
|                 else  // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED
 | |
|                     msg_ready_list.Append(*rxState);
 | |
|             }  // end while(NULL != (rxState = iterator.GetNextItem()))
 | |
|         }  // end if (zmq_input_ready)
 | |
|     }  // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty()))
 | |
|     
 | |
|     // Alert zmq of the messages we have pushed up
 | |
|     zmq_session->flush();
 | |
|     
 | |
| }  // end zmq::norm_engine_t::recv_data()
 | |
| 
 | |
| zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream, 
 | |
|                                                          int64_t          maxMsgSize)
 | |
|  : norm_stream(normStream), max_msg_size(maxMsgSize), 
 | |
|    in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false),
 | |
|    buffer_ptr(NULL), buffer_size(0), buffer_count(0),
 | |
|    prev(NULL), next(NULL), list(NULL)
 | |
| {
 | |
| }
 | |
| 
 | |
| zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState()
 | |
| {
 | |
|     if (NULL != zmq_decoder)
 | |
|     {
 | |
|         delete zmq_decoder;
 | |
|         zmq_decoder = NULL;
 | |
|     }
 | |
|     if (NULL != list)
 | |
|     {
 | |
|         list->Remove(*this);
 | |
|         list = NULL;
 | |
|     }
 | |
| }
 | |
| 
 | |
| bool zmq::norm_engine_t::NormRxStreamState::Init()
 | |
| {
 | |
|     in_sync = false;
 | |
|     skip_norm_sync = false;
 | |
|     if (NULL != zmq_decoder) delete zmq_decoder;
 | |
|     // Note "in_batch_size" comes from config.h
 | |
|     zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size);
 | |
|     alloc_assert (zmq_decoder);
 | |
|     if (NULL != zmq_decoder)
 | |
|     {
 | |
|         buffer_count = 0;
 | |
|         buffer_size = 0;
 | |
|         zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
 | |
|         return true;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         return false;
 | |
|     }
 | |
| }  // end zmq::norm_engine_t::NormRxStreamState::Init()
 | |
| 
 | |
| // This decodes any pending data sitting in our stream decoder buffer
 | |
| // It returns 1 upon message completion, -1 on error, 1 on msg completion
 | |
| int zmq::norm_engine_t::NormRxStreamState::Decode()
 | |
| {
 | |
|     // If we have pending bytes to decode, process those first
 | |
|     while (buffer_count > 0)
 | |
|     {
 | |
|         // There's pending data for the decoder to decode
 | |
|         size_t processed = 0;
 | |
|         
 | |
|         // This a bit of a kludgy approach used to weed
 | |
|         // out the NORM ZMQ message transport "syncFlag" byte
 | |
|         // from the ZMQ message stream being decoded (but it works!)
 | |
|         if (skip_norm_sync) 
 | |
|         {
 | |
|             buffer_ptr++;
 | |
|             buffer_count--;
 | |
|             skip_norm_sync = false;
 | |
|         }
 | |
|         
 | |
|         int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed);
 | |
|         buffer_ptr += processed;
 | |
|         buffer_count -= processed;
 | |
|         switch (rc)
 | |
|         {
 | |
|             case 1:
 | |
|                 // msg completed
 | |
|                 if (0 == buffer_count)
 | |
|                 {
 | |
|                     buffer_size = 0;
 | |
|                     zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
 | |
|                 }
 | |
|                 skip_norm_sync = true;
 | |
|                 return 1;
 | |
|             case -1:
 | |
|                 // decoder error (reset decoder and state variables)
 | |
|                 in_sync = false;
 | |
|                 skip_norm_sync = false;  // will get consumed by norm sync check
 | |
|                 Init();
 | |
|                 break;
 | |
|                 
 | |
|             case 0:
 | |
|                 // need more data, keep decoding until buffer exhausted
 | |
|                 break;
 | |
|         }
 | |
|     }
 | |
|     // Reset buffer pointer/count for next read
 | |
|     buffer_count = 0;
 | |
|     buffer_size = 0;
 | |
|     zmq_decoder->get_buffer(&buffer_ptr, &buffer_size);
 | |
|     return 0;  //  need more data
 | |
|     
 | |
| }  // end zmq::norm_engine_t::NormRxStreamState::Decode()
 | |
| 
 | |
| zmq::norm_engine_t::NormRxStreamState::List::List()
 | |
|  : head(NULL), tail(NULL)
 | |
| {
 | |
| }
 | |
| 
 | |
| zmq::norm_engine_t::NormRxStreamState::List::~List()
 | |
| {
 | |
|     Destroy();
 | |
| }
 | |
| 
 | |
| void zmq::norm_engine_t::NormRxStreamState::List::Destroy()
 | |
| {
 | |
|     NormRxStreamState* item = head;
 | |
|     while (NULL != item)
 | |
|     {
 | |
|         Remove(*item);
 | |
|         delete item;
 | |
|         item = head;
 | |
|     }
 | |
| }  // end zmq::norm_engine_t::NormRxStreamState::List::Destroy()
 | |
| 
 | |
| void zmq::norm_engine_t::NormRxStreamState::List::Append(NormRxStreamState& item)
 | |
| {
 | |
|     item.prev = tail;
 | |
|     if (NULL != tail)
 | |
|         tail->next = &item;
 | |
|     else
 | |
|         head = &item;
 | |
|     item.next = NULL;
 | |
|     tail = &item;
 | |
|     item.list = this;
 | |
| }  // end zmq::norm_engine_t::NormRxStreamState::List::Append()
 | |
| 
 | |
| void zmq::norm_engine_t::NormRxStreamState::List::Remove(NormRxStreamState& item)
 | |
| {
 | |
|     if (NULL != item.prev)
 | |
|         item.prev->next = item.next;
 | |
|     else
 | |
|         head = item.next;
 | |
|     if (NULL != item.next)
 | |
|         item.next ->prev = item.prev;
 | |
|     else
 | |
|         tail = item.prev;
 | |
|     item.prev = item.next = NULL;
 | |
|     item.list = NULL;
 | |
| }  // end zmq::norm_engine_t::NormRxStreamState::List::Remove()
 | |
| 
 | |
| zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator(const List& list)
 | |
|  : next_item(list.head)
 | |
| {
 | |
| }
 | |
| 
 | |
| zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
 | |
| {
 | |
|     NormRxStreamState* nextItem = next_item;
 | |
|     if (NULL != nextItem) next_item = nextItem->next;
 | |
|     return nextItem;
 | |
| }  // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem()
 | |
|     
 | |
| 
 | |
| #endif // ZMQ_HAVE_NORM
 | 
