libzmq/src/norm_engine.hpp

189 lines
5.3 KiB
C++
Raw Normal View History

2014-03-15 10:46:21 -04:00
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
#if defined ZMQ_HAVE_NORM
#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "v2_decoder.hpp"
#include "v2_encoder.hpp"
2014-03-16 09:15:31 -04:00
#include <normApi.h>
2014-03-15 10:46:21 -04:00
namespace zmq
{
class io_thread_t;
class session_base_t;
class norm_engine_t : public io_object_t, public i_engine
{
public:
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
~norm_engine_t ();
// create NORM instance, session, etc
int init (const char *network_, bool send, bool recv);
void shutdown ();
// i_engine interface implementation.
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_);
// Terminate and deallocate the engine. Note that 'detached'
// events are not fired on termination.
virtual void terminate ();
// This method is called by the session to signalise that more
// messages can be written to the pipe.
virtual void restart_input ();
// This method is called by the session to signalise that there
// are messages to send available.
virtual void restart_output ();
virtual void zap_msg_available (){};
virtual const char *get_endpoint () const;
// i_poll_events interface implementation.
// (we only need in_event() for NormEvent notification)
// (i.e., don't have any output events or timers (yet))
void in_event ();
private:
void unplug ();
void send_data ();
void recv_data (NormObjectHandle stream);
enum
{
BUFFER_SIZE = 2048
};
// Used to keep track of streams from multiple senders
class NormRxStreamState
{
public:
NormRxStreamState (NormObjectHandle normStream,
int64_t maxMsgSize,
bool zeroCopy);
~NormRxStreamState ();
2014-03-15 10:46:21 -04:00
NormObjectHandle GetStreamHandle () const { return norm_stream; }
2014-03-15 10:46:21 -04:00
bool Init ();
2014-03-15 10:46:21 -04:00
void SetRxReady (bool state) { rx_ready = state; }
bool IsRxReady () const { return rx_ready; }
2014-03-15 10:46:21 -04:00
void SetSync (bool state) { in_sync = state; }
bool InSync () const { return in_sync; }
// These are used to feed data to decoder
// and its underlying "msg" buffer
char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
size_t GetBytesNeeded () const { return (buffer_size - buffer_count); }
void IncrementBufferCount (size_t count) { buffer_count += count; }
msg_t *AccessMsg () { return zmq_decoder->msg (); }
// This invokes the decoder "decode" method
// returning 0 if more data is needed,
// 1 if the message is complete, If an error
// occurs the 'sync' is dropped and the
// decoder re-initialized
int Decode ();
class List
{
public:
List ();
~List ();
void Append (NormRxStreamState &item);
void Remove (NormRxStreamState &item);
bool IsEmpty () const { return (NULL == head); }
void Destroy ();
class Iterator
2014-03-15 10:46:21 -04:00
{
public:
Iterator (const List &list);
NormRxStreamState *GetNextItem ();
private:
NormRxStreamState *next_item;
};
friend class Iterator;
private:
NormRxStreamState *head;
NormRxStreamState *tail;
}; // end class zmq::norm_engine_t::NormRxStreamState::List
friend class List;
List *AccessList () { return list; }
private:
NormObjectHandle norm_stream;
int64_t max_msg_size;
bool zero_copy;
bool in_sync;
bool rx_ready;
v2_decoder_t *zmq_decoder;
bool skip_norm_sync;
unsigned char *buffer_ptr;
size_t buffer_size;
size_t buffer_count;
NormRxStreamState *prev;
NormRxStreamState *next;
NormRxStreamState::List *list;
}; // end class zmq::norm_engine_t::NormRxStreamState
session_base_t *zmq_session;
options_t options;
NormInstanceHandle norm_instance;
handle_t norm_descriptor_handle;
NormSessionHandle norm_session;
bool is_sender;
bool is_receiver;
// Sender state
msg_t tx_msg;
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
NormObjectHandle norm_tx_stream;
bool tx_first_msg;
bool tx_more_bit;
bool zmq_output_ready; // zmq has msg(s) to send
bool norm_tx_ready; // norm has tx queue vacancy
// TBD - maybe don't need buffer if can access zmq message buffer directly?
char tx_buffer[BUFFER_SIZE];
unsigned int tx_index;
unsigned int tx_len;
// Receiver state
// Lists of norm rx streams from remote senders
bool zmq_input_ready; // zmq ready to receive msg(s)
NormRxStreamState::List
rx_pending_list; // rx streams waiting for data reception
NormRxStreamState::List
rx_ready_list; // rx streams ready for NormStreamRead()
NormRxStreamState::List
msg_ready_list; // rx streams w/ msg ready for push to zmq
}; // end class norm_engine_t
2014-03-15 10:46:21 -04:00
}
#endif // ZMQ_HAVE_NORM
#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__