#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__ #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__ #if defined ZMQ_HAVE_NORM #include "io_object.hpp" #include "i_engine.hpp" #include "options.hpp" #include "v2_decoder.hpp" #include "v2_encoder.hpp" #include namespace zmq { class io_thread_t; class msg_t; class session_base_t; class norm_engine_t : public io_object_t, public i_engine { public: norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_); ~norm_engine_t (); // create NORM instance, session, etc int init (const char *network_, bool send, bool recv); void shutdown (); // i_engine interface implementation. // Plug the engine to the session. virtual void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_); // Terminate and deallocate the engine. Note that 'detached' // events are not fired on termination. virtual void terminate (); // This method is called by the session to signalise that more // messages can be written to the pipe. virtual void restart_input (); // This method is called by the session to signalise that there // are messages to send available. virtual void restart_output (); virtual void zap_msg_available (){}; virtual const char *get_endpoint () const; // i_poll_events interface implementation. // (we only need in_event() for NormEvent notification) // (i.e., don't have any output events or timers (yet)) void in_event (); private: void unplug (); void send_data (); void recv_data (NormObjectHandle stream); enum { BUFFER_SIZE = 2048 }; // Used to keep track of streams from multiple senders class NormRxStreamState { public: NormRxStreamState (NormObjectHandle normStream, int64_t maxMsgSize, bool zeroCopy); ~NormRxStreamState (); NormObjectHandle GetStreamHandle () const { return norm_stream; } bool Init (); void SetRxReady (bool state) { rx_ready = state; } bool IsRxReady () const { return rx_ready; } void SetSync (bool state) { in_sync = state; } bool InSync () const { return in_sync; } // These are used to feed data to decoder // and its underlying "msg" buffer char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); } size_t GetBytesNeeded () const { return (buffer_size - buffer_count); } void IncrementBufferCount (size_t count) { buffer_count += count; } msg_t *AccessMsg () { return zmq_decoder->msg (); } // This invokes the decoder "decode" method // returning 0 if more data is needed, // 1 if the message is complete, If an error // occurs the 'sync' is dropped and the // decoder re-initialized int Decode (); class List { public: List (); ~List (); void Append (NormRxStreamState &item); void Remove (NormRxStreamState &item); bool IsEmpty () const { return (NULL == head); } void Destroy (); class Iterator { public: Iterator (const List &list); NormRxStreamState *GetNextItem (); private: NormRxStreamState *next_item; }; friend class Iterator; private: NormRxStreamState *head; NormRxStreamState *tail; }; // end class zmq::norm_engine_t::NormRxStreamState::List friend class List; List *AccessList () { return list; } private: NormObjectHandle norm_stream; int64_t max_msg_size; bool zero_copy; bool in_sync; bool rx_ready; v2_decoder_t *zmq_decoder; bool skip_norm_sync; unsigned char *buffer_ptr; size_t buffer_size; size_t buffer_count; NormRxStreamState *prev; NormRxStreamState *next; NormRxStreamState::List *list; }; // end class zmq::norm_engine_t::NormRxStreamState session_base_t *zmq_session; options_t options; NormInstanceHandle norm_instance; handle_t norm_descriptor_handle; NormSessionHandle norm_session; bool is_sender; bool is_receiver; // Sender state msg_t tx_msg; v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) NormObjectHandle norm_tx_stream; bool tx_first_msg; bool tx_more_bit; bool zmq_output_ready; // zmq has msg(s) to send bool norm_tx_ready; // norm has tx queue vacancy // TBD - maybe don't need buffer if can access zmq message buffer directly? char tx_buffer[BUFFER_SIZE]; unsigned int tx_index; unsigned int tx_len; // Receiver state // Lists of norm rx streams from remote senders bool zmq_input_ready; // zmq ready to receive msg(s) NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead() NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq }; // end class norm_engine_t } #endif // ZMQ_HAVE_NORM #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__