mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-14 11:06:59 +01:00
commit
734a73c63d
@ -107,7 +107,7 @@ namespace zmq
|
|||||||
// Cookie received from server
|
// Cookie received from server
|
||||||
uint8_t cn_cookie [16 + 80];
|
uint8_t cn_cookie [16 + 80];
|
||||||
|
|
||||||
// Intermediary buffer used to seepd up boxing and unboxing.
|
// Intermediary buffer used to speed up boxing and unboxing.
|
||||||
uint8_t cn_precom [crypto_box_BEFORENMBYTES];
|
uint8_t cn_precom [crypto_box_BEFORENMBYTES];
|
||||||
|
|
||||||
// Nonce
|
// Nonce
|
||||||
|
@ -53,7 +53,7 @@ namespace zmq
|
|||||||
// This class implements the state machine that parses the incoming buffer.
|
// This class implements the state machine that parses the incoming buffer.
|
||||||
// Derived class should implement individual state machine actions.
|
// Derived class should implement individual state machine actions.
|
||||||
//
|
//
|
||||||
// Buffer managment is done by an allocator policy.
|
// Buffer management is done by an allocator policy.
|
||||||
template <typename T, typename A = c_single_allocator>
|
template <typename T, typename A = c_single_allocator>
|
||||||
class decoder_base_t : public i_decoder
|
class decoder_base_t : public i_decoder
|
||||||
{
|
{
|
||||||
@ -99,11 +99,11 @@ namespace zmq
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Processes the data in the buffer previously allocated using
|
// Processes the data in the buffer previously allocated using
|
||||||
// get_buffer function. size_ argument specifies nemuber of bytes
|
// get_buffer function. size_ argument specifies number of bytes
|
||||||
// actually filled into the buffer. Function returns 1 when the
|
// actually filled into the buffer. Function returns 1 when the
|
||||||
// whole message was decoded or 0 when more data is required.
|
// whole message was decoded or 0 when more data is required.
|
||||||
// On error, -1 is returned and errno set accordingly.
|
// On error, -1 is returned and errno set accordingly.
|
||||||
// Number of bytes processed is returned in byts_used_.
|
// Number of bytes processed is returned in bytes_used_.
|
||||||
int decode (const unsigned char *data_, std::size_t size_,
|
int decode (const unsigned char *data_, std::size_t size_,
|
||||||
std::size_t &bytes_used_)
|
std::size_t &bytes_used_)
|
||||||
{
|
{
|
||||||
|
@ -80,7 +80,7 @@ namespace zmq
|
|||||||
c_single_allocator& operator = (c_single_allocator const&);
|
c_single_allocator& operator = (c_single_allocator const&);
|
||||||
};
|
};
|
||||||
|
|
||||||
// This allocater allocates a reference counted buffer which is used by v2_decoder_t
|
// This allocator allocates a reference counted buffer which is used by v2_decoder_t
|
||||||
// to use zero-copy msg::init_data to create messages with memory from this buffer as
|
// to use zero-copy msg::init_data to create messages with memory from this buffer as
|
||||||
// data storage.
|
// data storage.
|
||||||
//
|
//
|
||||||
@ -102,7 +102,7 @@ namespace zmq
|
|||||||
// Allocate a new buffer
|
// Allocate a new buffer
|
||||||
//
|
//
|
||||||
// This releases the current buffer to be bound to the lifetime of the messages
|
// This releases the current buffer to be bound to the lifetime of the messages
|
||||||
// created on this bufer.
|
// created on this buffer.
|
||||||
unsigned char* allocate ();
|
unsigned char* allocate ();
|
||||||
|
|
||||||
// force deallocation of buffer.
|
// force deallocation of buffer.
|
||||||
|
@ -148,7 +148,7 @@ int zmq::dist_t::send_to_matching (msg_t *msg_)
|
|||||||
// Push the message to matching pipes.
|
// Push the message to matching pipes.
|
||||||
distribute (msg_);
|
distribute (msg_);
|
||||||
|
|
||||||
// If mutlipart message is fully sent, activate all the eligible pipes.
|
// If multipart message is fully sent, activate all the eligible pipes.
|
||||||
if (!msg_more)
|
if (!msg_more)
|
||||||
active = eligible;
|
active = eligible;
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ namespace zmq
|
|||||||
// there are following parts still waiting in the current pipe.
|
// there are following parts still waiting in the current pipe.
|
||||||
bool more;
|
bool more;
|
||||||
|
|
||||||
// Holds credential after the last_acive_pipe has terminated.
|
// Holds credential after the last_active_pipe has terminated.
|
||||||
blob_t saved_credential;
|
blob_t saved_credential;
|
||||||
|
|
||||||
fq_t (const fq_t&);
|
fq_t (const fq_t&);
|
||||||
|
@ -49,7 +49,7 @@ namespace zmq
|
|||||||
virtual void resize_buffer(size_t) = 0;
|
virtual void resize_buffer(size_t) = 0;
|
||||||
// Decodes data pointed to by data_.
|
// Decodes data pointed to by data_.
|
||||||
// When a message is decoded, 1 is returned.
|
// When a message is decoded, 1 is returned.
|
||||||
// When the decoder needs more data, 0 is returnd.
|
// When the decoder needs more data, 0 is returned.
|
||||||
// On error, -1 is returned and errno is set accordingly.
|
// On error, -1 is returned and errno is set accordingly.
|
||||||
virtual int decode (const unsigned char *data_, size_t size_,
|
virtual int decode (const unsigned char *data_, size_t size_,
|
||||||
size_t &processed) = 0;
|
size_t &processed) = 0;
|
||||||
|
@ -52,7 +52,7 @@ namespace zmq
|
|||||||
|
|
||||||
io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
|
io_thread_t (zmq::ctx_t *ctx_, uint32_t tid_);
|
||||||
|
|
||||||
// Clean-up. If the thread was started, it's neccessary to call 'stop'
|
// Clean-up. If the thread was started, it's necessary to call 'stop'
|
||||||
// before invoking destructor. Otherwise the destructor would hang up.
|
// before invoking destructor. Otherwise the destructor would hang up.
|
||||||
~io_thread_t ();
|
~io_thread_t ();
|
||||||
|
|
||||||
@ -70,7 +70,7 @@ namespace zmq
|
|||||||
void out_event ();
|
void out_event ();
|
||||||
void timer_event (int id_);
|
void timer_event (int id_);
|
||||||
|
|
||||||
// Used by io_objects to retrieve the assciated poller object.
|
// Used by io_objects to retrieve the associated poller object.
|
||||||
poller_t *get_poller ();
|
poller_t *get_poller ();
|
||||||
|
|
||||||
// Command handlers.
|
// Command handlers.
|
||||||
|
@ -103,7 +103,7 @@ void zmq::ipc_connecter_t::process_term (int linger_)
|
|||||||
|
|
||||||
void zmq::ipc_connecter_t::in_event ()
|
void zmq::ipc_connecter_t::in_event ()
|
||||||
{
|
{
|
||||||
// We are not polling for incomming data, so we are actually called
|
// We are not polling for incoming data, so we are actually called
|
||||||
// because of error here. However, we can get error on out event as well
|
// because of error here. However, we can get error on out event as well
|
||||||
// on some platforms, so we'll simply handle both events in the same way.
|
// on some platforms, so we'll simply handle both events in the same way.
|
||||||
out_event ();
|
out_event ();
|
||||||
@ -216,7 +216,7 @@ int zmq::ipc_connecter_t::open ()
|
|||||||
s, addr->resolved.ipc_addr->addr (),
|
s, addr->resolved.ipc_addr->addr (),
|
||||||
addr->resolved.ipc_addr->addrlen ());
|
addr->resolved.ipc_addr->addrlen ());
|
||||||
|
|
||||||
// Connect was successfull immediately.
|
// Connect was successful immediately.
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ namespace zmq
|
|||||||
int get_new_reconnect_ivl ();
|
int get_new_reconnect_ivl ();
|
||||||
|
|
||||||
// Open IPC connecting socket. Returns -1 in case of error,
|
// Open IPC connecting socket. Returns -1 in case of error,
|
||||||
// 0 if connect was successfull immediately. Returns -1 with
|
// 0 if connect was successful immediately. Returns -1 with
|
||||||
// EAGAIN errno if async connect was launched.
|
// EAGAIN errno if async connect was launched.
|
||||||
int open ();
|
int open ();
|
||||||
|
|
||||||
@ -91,7 +91,7 @@ namespace zmq
|
|||||||
int close ();
|
int close ();
|
||||||
|
|
||||||
// Get the file descriptor of newly created connection. Returns
|
// Get the file descriptor of newly created connection. Returns
|
||||||
// retired_fd if the connection was unsuccessfull.
|
// retired_fd if the connection was unsuccessful.
|
||||||
fd_t connect ();
|
fd_t connect ();
|
||||||
|
|
||||||
// Address to connect to. Owned by session_base_t.
|
// Address to connect to. Owned by session_base_t.
|
||||||
|
@ -84,7 +84,7 @@ namespace zmq
|
|||||||
// if the connection was dropped while waiting in the listen backlog.
|
// if the connection was dropped while waiting in the listen backlog.
|
||||||
fd_t accept ();
|
fd_t accept ();
|
||||||
|
|
||||||
// True, if the undelying file for UNIX domain socket exists.
|
// True, if the underlying file for UNIX domain socket exists.
|
||||||
bool has_file;
|
bool has_file;
|
||||||
|
|
||||||
// Name of the file associated with the UNIX domain address.
|
// Name of the file associated with the UNIX domain address.
|
||||||
@ -96,7 +96,7 @@ namespace zmq
|
|||||||
// Handle corresponding to the listening socket.
|
// Handle corresponding to the listening socket.
|
||||||
handle_t handle;
|
handle_t handle;
|
||||||
|
|
||||||
// Socket the listerner belongs to.
|
// Socket the listener belongs to.
|
||||||
zmq::socket_base_t *socket;
|
zmq::socket_base_t *socket;
|
||||||
|
|
||||||
// String representation of endpoint to bind to
|
// String representation of endpoint to bind to
|
||||||
|
@ -39,7 +39,7 @@ namespace zmq
|
|||||||
{
|
{
|
||||||
|
|
||||||
// Abstract class representing security mechanism.
|
// Abstract class representing security mechanism.
|
||||||
// Different mechanism extedns this class.
|
// Different mechanism extends this class.
|
||||||
|
|
||||||
class msg_t;
|
class msg_t;
|
||||||
|
|
||||||
|
@ -91,7 +91,7 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
|
|||||||
if (c < min || c >= min + count) {
|
if (c < min || c >= min + count) {
|
||||||
|
|
||||||
// The character is out of range of currently handled
|
// The character is out of range of currently handled
|
||||||
// charcters. We have to extend the table.
|
// characters. We have to extend the table.
|
||||||
if (!count) {
|
if (!count) {
|
||||||
min = c;
|
min = c;
|
||||||
count = 1;
|
count = 1;
|
||||||
|
@ -167,7 +167,7 @@ namespace zmq
|
|||||||
bool tx_more_bit;
|
bool tx_more_bit;
|
||||||
bool zmq_output_ready; // zmq has msg(s) to send
|
bool zmq_output_ready; // zmq has msg(s) to send
|
||||||
bool norm_tx_ready; // norm has tx queue vacancy
|
bool norm_tx_ready; // norm has tx queue vacancy
|
||||||
// tbd - maybe don't need buffer if can access zmq message buffer directly?
|
// TBD - maybe don't need buffer if can access zmq message buffer directly?
|
||||||
char tx_buffer[BUFFER_SIZE];
|
char tx_buffer[BUFFER_SIZE];
|
||||||
unsigned int tx_index;
|
unsigned int tx_index;
|
||||||
unsigned int tx_len;
|
unsigned int tx_len;
|
||||||
|
@ -110,7 +110,7 @@ namespace zmq
|
|||||||
void send_reaped ();
|
void send_reaped ();
|
||||||
void send_done ();
|
void send_done ();
|
||||||
|
|
||||||
// These handlers can be overrided by the derived objects. They are
|
// These handlers can be overridden by the derived objects. They are
|
||||||
// called when command arrives from another thread.
|
// called when command arrives from another thread.
|
||||||
virtual void process_stop ();
|
virtual void process_stop ();
|
||||||
virtual void process_plug ();
|
virtual void process_plug ();
|
||||||
|
@ -142,7 +142,7 @@ namespace zmq
|
|||||||
bool raw_socket;
|
bool raw_socket;
|
||||||
bool raw_notify; // Provide connect notifications
|
bool raw_notify; // Provide connect notifications
|
||||||
|
|
||||||
// Addres of SOCKS proxy
|
// Address of SOCKS proxy
|
||||||
std::string socks_proxy_address;
|
std::string socks_proxy_address;
|
||||||
|
|
||||||
// TCP keep-alive settings.
|
// TCP keep-alive settings.
|
||||||
|
@ -138,7 +138,7 @@ void zmq::own_t::terminate ()
|
|||||||
if (terminating)
|
if (terminating)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// As for the root of the ownership tree, there's noone to terminate it,
|
// As for the root of the ownership tree, there's no one to terminate it,
|
||||||
// so it has to terminate itself.
|
// so it has to terminate itself.
|
||||||
if (!owner) {
|
if (!owner) {
|
||||||
process_term (options.linger);
|
process_term (options.linger);
|
||||||
|
@ -96,12 +96,12 @@ namespace zmq
|
|||||||
// specific type of the owned object correctly.
|
// specific type of the owned object correctly.
|
||||||
virtual ~own_t ();
|
virtual ~own_t ();
|
||||||
|
|
||||||
// Term handler is protocted rather than private so that it can
|
// Term handler is protected rather than private so that it can
|
||||||
// be intercepted by the derived class. This is useful to add custom
|
// be intercepted by the derived class. This is useful to add custom
|
||||||
// steps to the beginning of the termination process.
|
// steps to the beginning of the termination process.
|
||||||
void process_term (int linger_);
|
void process_term (int linger_);
|
||||||
|
|
||||||
// A place to hook in when phyicallal destruction of the object
|
// A place to hook in when physical destruction of the object
|
||||||
// is to be delayed.
|
// is to be delayed.
|
||||||
virtual void process_destroy ();
|
virtual void process_destroy ();
|
||||||
|
|
||||||
@ -119,7 +119,7 @@ namespace zmq
|
|||||||
void process_term_ack ();
|
void process_term_ack ();
|
||||||
void process_seqnum ();
|
void process_seqnum ();
|
||||||
|
|
||||||
// Check whether all the peding term acks were delivered.
|
// Check whether all the pending term acks were delivered.
|
||||||
// If so, deallocate this object.
|
// If so, deallocate this object.
|
||||||
void check_term_acks ();
|
void check_term_acks ();
|
||||||
|
|
||||||
|
@ -226,7 +226,7 @@ void zmq::pgm_receiver_t::in_event ()
|
|||||||
zmq_assert (offset <= insize);
|
zmq_assert (offset <= insize);
|
||||||
zmq_assert (it->second.decoder == NULL);
|
zmq_assert (it->second.decoder == NULL);
|
||||||
|
|
||||||
// We have to move data to the begining of the first message.
|
// We have to move data to the beginning of the first message.
|
||||||
inpos += offset;
|
inpos += offset;
|
||||||
insize -= offset;
|
insize -= offset;
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_)
|
|||||||
|
|
||||||
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
|
void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
|
||||||
{
|
{
|
||||||
// Alocate 2 fds for PGM socket.
|
// Allocate 2 fds for PGM socket.
|
||||||
fd_t downlink_socket_fd = retired_fd;
|
fd_t downlink_socket_fd = retired_fd;
|
||||||
fd_t uplink_socket_fd = retired_fd;
|
fd_t uplink_socket_fd = retired_fd;
|
||||||
fd_t rdata_notify_fd = retired_fd;
|
fd_t rdata_notify_fd = retired_fd;
|
||||||
@ -95,7 +95,7 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
|
|||||||
pending_notify_handle = add_fd (pending_notify_fd);
|
pending_notify_handle = add_fd (pending_notify_fd);
|
||||||
|
|
||||||
// Set POLLIN. We wont never want to stop polling for uplink = we never
|
// Set POLLIN. We wont never want to stop polling for uplink = we never
|
||||||
// want to stop porocess NAKs.
|
// want to stop processing NAKs.
|
||||||
set_pollin (uplink_handle);
|
set_pollin (uplink_handle);
|
||||||
set_pollin (rdata_notify_handle);
|
set_pollin (rdata_notify_handle);
|
||||||
set_pollin (pending_notify_handle);
|
set_pollin (pending_notify_handle);
|
||||||
|
@ -263,7 +263,7 @@ void zmq::pipe_t::process_activate_read ()
|
|||||||
|
|
||||||
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
|
void zmq::pipe_t::process_activate_write (uint64_t msgs_read_)
|
||||||
{
|
{
|
||||||
// Remember the peers's message sequence number.
|
// Remember the peer's message sequence number.
|
||||||
peers_msgs_read = msgs_read_;
|
peers_msgs_read = msgs_read_;
|
||||||
|
|
||||||
if (!out_active && state == active) {
|
if (!out_active && state == active) {
|
||||||
@ -385,7 +385,7 @@ void zmq::pipe_t::terminate (bool delay_)
|
|||||||
// Overload the value specified at pipe creation.
|
// Overload the value specified at pipe creation.
|
||||||
delay = delay_;
|
delay = delay_;
|
||||||
|
|
||||||
// If terminate was already called, we can ignore the duplicit invocation.
|
// If terminate was already called, we can ignore the duplicate invocation.
|
||||||
if (state == term_req_sent1 || state == term_req_sent2) {
|
if (state == term_req_sent1 || state == term_req_sent2) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -95,7 +95,7 @@ int zmq::req_t::xsend (msg_t *msg_)
|
|||||||
|
|
||||||
message_begins = false;
|
message_begins = false;
|
||||||
|
|
||||||
// Eat all currently avaliable messages before the request is fully
|
// Eat all currently available messages before the request is fully
|
||||||
// sent. This is done to avoid:
|
// sent. This is done to avoid:
|
||||||
// REQ sends request to A, A replies, B replies too.
|
// REQ sends request to A, A replies, B replies too.
|
||||||
// A's reply was first and matches, that is used.
|
// A's reply was first and matches, that is used.
|
||||||
|
@ -65,7 +65,7 @@ namespace zmq
|
|||||||
private:
|
private:
|
||||||
|
|
||||||
// If true, request was already sent and reply wasn't received yet or
|
// If true, request was already sent and reply wasn't received yet or
|
||||||
// was raceived partially.
|
// was received partially.
|
||||||
bool receiving_reply;
|
bool receiving_reply;
|
||||||
|
|
||||||
// If true, we are starting to send/recv a message. The first part
|
// If true, we are starting to send/recv a message. The first part
|
||||||
|
@ -388,7 +388,7 @@ bool zmq::router_t::xhas_in ()
|
|||||||
bool zmq::router_t::xhas_out ()
|
bool zmq::router_t::xhas_out ()
|
||||||
{
|
{
|
||||||
// In theory, ROUTER socket is always ready for writing. Whether actual
|
// In theory, ROUTER socket is always ready for writing. Whether actual
|
||||||
// attempt to write succeeds depends on whitch pipe the message is going
|
// attempt to write succeeds depends on which pipe the message is going
|
||||||
// to be routed to.
|
// to be routed to.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ namespace zmq
|
|||||||
// Checks if an fd_entry_t is retired.
|
// Checks if an fd_entry_t is retired.
|
||||||
static bool is_retired_fd (const fd_entry_t &entry);
|
static bool is_retired_fd (const fd_entry_t &entry);
|
||||||
|
|
||||||
// Set of file descriptors that are used to retreive
|
// Set of file descriptors that are used to retrieve
|
||||||
// information for fd_set.
|
// information for fd_set.
|
||||||
typedef std::vector <fd_entry_t> fd_set_t;
|
typedef std::vector <fd_entry_t> fd_set_t;
|
||||||
fd_set_t fds;
|
fd_set_t fds;
|
||||||
|
@ -162,7 +162,7 @@ bool zmq::server_t::xhas_in ()
|
|||||||
bool zmq::server_t::xhas_out ()
|
bool zmq::server_t::xhas_out ()
|
||||||
{
|
{
|
||||||
// In theory, SERVER socket is always ready for writing. Whether actual
|
// In theory, SERVER socket is always ready for writing. Whether actual
|
||||||
// attempt to write succeeds depends on whitch pipe the message is going
|
// attempt to write succeeds depends on which pipe the message is going
|
||||||
// to be routed to.
|
// to be routed to.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -213,7 +213,7 @@ int zmq::signaler_t::wait (int timeout_)
|
|||||||
{
|
{
|
||||||
#ifdef HAVE_FORK
|
#ifdef HAVE_FORK
|
||||||
if (unlikely (pid != getpid ())) {
|
if (unlikely (pid != getpid ())) {
|
||||||
// we have forked and the file descriptor is closed. Emulate an interupt
|
// we have forked and the file descriptor is closed. Emulate an interrupt
|
||||||
// response.
|
// response.
|
||||||
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
|
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
|
||||||
errno = EINTR;
|
errno = EINTR;
|
||||||
@ -238,7 +238,7 @@ int zmq::signaler_t::wait (int timeout_)
|
|||||||
#ifdef HAVE_FORK
|
#ifdef HAVE_FORK
|
||||||
else
|
else
|
||||||
if (unlikely (pid != getpid ())) {
|
if (unlikely (pid != getpid ())) {
|
||||||
// we have forked and the file descriptor is closed. Emulate an interupt
|
// we have forked and the file descriptor is closed. Emulate an interrupt
|
||||||
// response.
|
// response.
|
||||||
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
|
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
|
||||||
errno = EINTR;
|
errno = EINTR;
|
||||||
|
@ -64,7 +64,7 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Creates a pair of filedescriptors that will be used
|
// Creates a pair of file descriptors that will be used
|
||||||
// to pass the signals.
|
// to pass the signals.
|
||||||
static int make_fdpair (fd_t *r_, fd_t *w_);
|
static int make_fdpair (fd_t *r_, fd_t *w_);
|
||||||
|
|
||||||
|
@ -237,7 +237,7 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
|
|||||||
|
|
||||||
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
||||||
{
|
{
|
||||||
// First check out whether the protcol is something we are aware of.
|
// First check out whether the protocol is something we are aware of.
|
||||||
if (protocol_ != "inproc"
|
if (protocol_ != "inproc"
|
||||||
&& protocol_ != "ipc"
|
&& protocol_ != "ipc"
|
||||||
&& protocol_ != "tcp"
|
&& protocol_ != "tcp"
|
||||||
@ -249,7 +249,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
|
// If 0MQ is not compiled with OpenPGM, pgm and epgm transports
|
||||||
// are not avaialble.
|
// are not available.
|
||||||
#if !defined ZMQ_HAVE_OPENPGM
|
#if !defined ZMQ_HAVE_OPENPGM
|
||||||
if (protocol_ == "pgm" || protocol_ == "epgm") {
|
if (protocol_ == "pgm" || protocol_ == "epgm") {
|
||||||
errno = EPROTONOSUPPORT;
|
errno = EPROTONOSUPPORT;
|
||||||
@ -512,7 +512,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
|
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
|
||||||
// For convenience's sake, bind can be used interchageable with
|
// For convenience's sake, bind can be used interchangeable with
|
||||||
// connect for PGM, EPGM and NORM transports.
|
// connect for PGM, EPGM and NORM transports.
|
||||||
EXIT_MUTEX();
|
EXIT_MUTEX();
|
||||||
rc = connect (addr_);
|
rc = connect (addr_);
|
||||||
@ -521,7 +521,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remaining trasnports require to be run in an I/O thread, so at this
|
// Remaining transports require to be run in an I/O thread, so at this
|
||||||
// point we'll choose one.
|
// point we'll choose one.
|
||||||
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
||||||
if (!io_thread) {
|
if (!io_thread) {
|
||||||
|
@ -99,7 +99,7 @@ namespace zmq
|
|||||||
bool has_in ();
|
bool has_in ();
|
||||||
bool has_out ();
|
bool has_out ();
|
||||||
|
|
||||||
// Using this function reaper thread ask the socket to regiter with
|
// Using this function reaper thread ask the socket to register with
|
||||||
// its poller.
|
// its poller.
|
||||||
void start_reaping (poller_t *poller_);
|
void start_reaping (poller_t *poller_);
|
||||||
|
|
||||||
@ -171,7 +171,7 @@ namespace zmq
|
|||||||
// Delay actual destruction of the socket.
|
// Delay actual destruction of the socket.
|
||||||
void process_destroy ();
|
void process_destroy ();
|
||||||
|
|
||||||
// Socket event data dispath
|
// Socket event data dispatch
|
||||||
void monitor_event (int event_, int value_, const std::string& addr_);
|
void monitor_event (int event_, int value_, const std::string& addr_);
|
||||||
|
|
||||||
// Monitor socket cleanup
|
// Monitor socket cleanup
|
||||||
|
@ -361,7 +361,7 @@ int zmq::socks_connecter_t::connect_to_proxy ()
|
|||||||
// Connect to the remote peer.
|
// Connect to the remote peer.
|
||||||
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
|
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
|
||||||
|
|
||||||
// Connect was successfull immediately.
|
// Connect was successful immediately.
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -103,7 +103,7 @@ namespace zmq
|
|||||||
int get_new_reconnect_ivl ();
|
int get_new_reconnect_ivl ();
|
||||||
|
|
||||||
// Open TCP connecting socket. Returns -1 in case of error,
|
// Open TCP connecting socket. Returns -1 in case of error,
|
||||||
// 0 if connect was successfull immediately. Returns -1 with
|
// 0 if connect was successful immediately. Returns -1 with
|
||||||
// EAGAIN errno if async connect was launched.
|
// EAGAIN errno if async connect was launched.
|
||||||
int open ();
|
int open ();
|
||||||
|
|
||||||
@ -111,7 +111,7 @@ namespace zmq
|
|||||||
void close ();
|
void close ();
|
||||||
|
|
||||||
// Get the file descriptor of newly created connection. Returns
|
// Get the file descriptor of newly created connection. Returns
|
||||||
// retired_fd if the connection was unsuccessfull.
|
// retired_fd if the connection was unsuccessful.
|
||||||
zmq::fd_t check_proxy_connection ();
|
zmq::fd_t check_proxy_connection ();
|
||||||
|
|
||||||
socks_greeting_encoder_t greeting_encoder;
|
socks_greeting_encoder_t greeting_encoder;
|
||||||
|
@ -226,7 +226,7 @@ int zmq::stream_t::xrecv (msg_t *msg_)
|
|||||||
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
|
zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0);
|
||||||
|
|
||||||
// We have received a frame with TCP data.
|
// We have received a frame with TCP data.
|
||||||
// Rather than sendig this frame, we keep it in prefetched
|
// Rather than sending this frame, we keep it in prefetched
|
||||||
// buffer and send a frame with peer's ID.
|
// buffer and send a frame with peer's ID.
|
||||||
blob_t identity = pipe->get_identity ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = msg_->close();
|
rc = msg_->close();
|
||||||
|
@ -477,7 +477,7 @@ int zmq::tcp_address_t::to_string (std::string &addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Not using service resolv because of
|
// Not using service resolving because of
|
||||||
// https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
|
// https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
|
||||||
char hbuf [NI_MAXHOST];
|
char hbuf [NI_MAXHOST];
|
||||||
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof hbuf, NULL, 0, NI_NUMERICHOST);
|
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof hbuf, NULL, 0, NI_NUMERICHOST);
|
||||||
|
@ -51,7 +51,7 @@ namespace zmq
|
|||||||
virtual ~tcp_address_t ();
|
virtual ~tcp_address_t ();
|
||||||
|
|
||||||
// This function translates textual TCP address into an address
|
// This function translates textual TCP address into an address
|
||||||
// strcuture. If 'local' is true, names are resolved as local interface
|
// structure. If 'local' is true, names are resolved as local interface
|
||||||
// names. If it is false, names are resolved as remote hostnames.
|
// names. If it is false, names are resolved as remote hostnames.
|
||||||
// If 'ipv6' is true, the name may resolve to IPv6 address.
|
// If 'ipv6' is true, the name may resolve to IPv6 address.
|
||||||
int resolve (const char *name_, bool local_, bool ipv6_, bool is_src_ = false);
|
int resolve (const char *name_, bool local_, bool ipv6_, bool is_src_ = false);
|
||||||
|
@ -311,7 +311,7 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
// Connect to the remote peer.
|
// Connect to the remote peer.
|
||||||
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
|
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
|
||||||
|
|
||||||
// Connect was successfull immediately.
|
// Connect was successful immediately.
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ namespace zmq
|
|||||||
int get_new_reconnect_ivl ();
|
int get_new_reconnect_ivl ();
|
||||||
|
|
||||||
// Open TCP connecting socket. Returns -1 in case of error,
|
// Open TCP connecting socket. Returns -1 in case of error,
|
||||||
// 0 if connect was successfull immediately. Returns -1 with
|
// 0 if connect was successful immediately. Returns -1 with
|
||||||
// EAGAIN errno if async connect was launched.
|
// EAGAIN errno if async connect was launched.
|
||||||
int open ();
|
int open ();
|
||||||
|
|
||||||
@ -91,7 +91,7 @@ namespace zmq
|
|||||||
void close ();
|
void close ();
|
||||||
|
|
||||||
// Get the file descriptor of newly created connection. Returns
|
// Get the file descriptor of newly created connection. Returns
|
||||||
// retired_fd if the connection was unsuccessfull.
|
// retired_fd if the connection was unsuccessful.
|
||||||
fd_t connect ();
|
fd_t connect ();
|
||||||
|
|
||||||
// Address to connect to. Owned by session_base_t.
|
// Address to connect to. Owned by session_base_t.
|
||||||
|
@ -84,7 +84,7 @@ namespace zmq
|
|||||||
// Handle corresponding to the listening socket.
|
// Handle corresponding to the listening socket.
|
||||||
handle_t handle;
|
handle_t handle;
|
||||||
|
|
||||||
// Socket the listerner belongs to.
|
// Socket the listener belongs to.
|
||||||
zmq::socket_base_t *socket;
|
zmq::socket_base_t *socket;
|
||||||
|
|
||||||
// String representation of endpoint to bind to
|
// String representation of endpoint to bind to
|
||||||
|
@ -102,7 +102,7 @@ void zmq::tipc_connecter_t::process_term (int linger_)
|
|||||||
|
|
||||||
void zmq::tipc_connecter_t::in_event ()
|
void zmq::tipc_connecter_t::in_event ()
|
||||||
{
|
{
|
||||||
// We are not polling for incomming data, so we are actually called
|
// We are not polling for incoming data, so we are actually called
|
||||||
// because of error here. However, we can get error on out event as well
|
// because of error here. However, we can get error on out event as well
|
||||||
// on some platforms, so we'll simply handle both events in the same way.
|
// on some platforms, so we'll simply handle both events in the same way.
|
||||||
out_event ();
|
out_event ();
|
||||||
@ -213,7 +213,7 @@ int zmq::tipc_connecter_t::open ()
|
|||||||
s, addr->resolved.tipc_addr->addr (),
|
s, addr->resolved.tipc_addr->addr (),
|
||||||
addr->resolved.tipc_addr->addrlen ());
|
addr->resolved.tipc_addr->addrlen ());
|
||||||
|
|
||||||
// Connect was successfull immediately.
|
// Connect was successful immediately.
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ namespace zmq
|
|||||||
void close ();
|
void close ();
|
||||||
|
|
||||||
// Get the file descriptor of newly created connection. Returns
|
// Get the file descriptor of newly created connection. Returns
|
||||||
// retired_fd if the connection was unsuccessfull.
|
// retired_fd if the connection was unsuccessful.
|
||||||
fd_t connect ();
|
fd_t connect ();
|
||||||
|
|
||||||
// Address to connect to. Owned by session_base_t.
|
// Address to connect to. Owned by session_base_t.
|
||||||
@ -121,7 +121,7 @@ namespace zmq
|
|||||||
int get_new_reconnect_ivl ();
|
int get_new_reconnect_ivl ();
|
||||||
|
|
||||||
// Open IPC connecting socket. Returns -1 in case of error,
|
// Open IPC connecting socket. Returns -1 in case of error,
|
||||||
// 0 if connect was successfull immediately. Returns -1 with
|
// 0 if connect was successful immediately. Returns -1 with
|
||||||
// EAGAIN errno if async connect was launched.
|
// EAGAIN errno if async connect was launched.
|
||||||
int open ();
|
int open ();
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ int zmq::tipc_listener_t::set_address (const char *addr_)
|
|||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
|
||||||
// Listen for incomming connections.
|
// Listen for incoming connections.
|
||||||
rc = listen (s, options.backlog);
|
rc = listen (s, options.backlog);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
goto error;
|
goto error;
|
||||||
|
@ -89,7 +89,7 @@ namespace zmq
|
|||||||
// Handle corresponding to the listening socket.
|
// Handle corresponding to the listening socket.
|
||||||
handle_t handle;
|
handle_t handle;
|
||||||
|
|
||||||
// Socket the listerner belongs to.
|
// Socket the listener belongs to.
|
||||||
zmq::socket_base_t *socket;
|
zmq::socket_base_t *socket;
|
||||||
|
|
||||||
// String representation of endpoint to bind to
|
// String representation of endpoint to bind to
|
||||||
|
@ -75,7 +75,7 @@ bool zmq::trie_t::add (unsigned char *prefix_, size_t size_)
|
|||||||
if (c < min || c >= min + count) {
|
if (c < min || c >= min + count) {
|
||||||
|
|
||||||
// The character is out of range of currently handled
|
// The character is out of range of currently handled
|
||||||
// charcters. We have to extend the table.
|
// characters. We have to extend the table.
|
||||||
if (!count) {
|
if (!count) {
|
||||||
min = c;
|
min = c;
|
||||||
count = 1;
|
count = 1;
|
||||||
|
@ -117,7 +117,7 @@ int zmq::v2_decoder_t::size_ready(uint64_t msg_size, unsigned char const* read_p
|
|||||||
if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size())))
|
if (unlikely ((unsigned char*)read_pos + msg_size > (data() + size())))
|
||||||
{
|
{
|
||||||
// a new message has started, but the size would exceed the pre-allocated arena
|
// a new message has started, but the size would exceed the pre-allocated arena
|
||||||
// this happens everytime when a message does not fit completely into the buffer
|
// this happens every time when a message does not fit completely into the buffer
|
||||||
rc = in_progress.init_size (static_cast <size_t> (msg_size));
|
rc = in_progress.init_size (static_cast <size_t> (msg_size));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -281,8 +281,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
|
|||||||
xpub_t *self = (xpub_t*) arg_;
|
xpub_t *self = (xpub_t*) arg_;
|
||||||
|
|
||||||
if (self->options.type != ZMQ_PUB) {
|
if (self->options.type != ZMQ_PUB) {
|
||||||
// Place the unsubscription to the queue of pending (un)sunscriptions
|
// Place the unsubscription to the queue of pending (un)subscriptions
|
||||||
// to be retrived by the user later on.
|
// to be retrieved by the user later on.
|
||||||
blob_t unsub (size_ + 1, 0);
|
blob_t unsub (size_ + 1, 0);
|
||||||
unsub [0] = 0;
|
unsub [0] = 0;
|
||||||
if (size_ > 0)
|
if (size_ > 0)
|
||||||
|
@ -68,7 +68,7 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Function to be applied to the trie to send all the subsciptions
|
// Function to be applied to the trie to send all the subscriptions
|
||||||
// upstream.
|
// upstream.
|
||||||
static void send_unsubscription (unsigned char *data_, size_t size_,
|
static void send_unsubscription (unsigned char *data_, size_t size_,
|
||||||
void *arg_);
|
void *arg_);
|
||||||
|
@ -841,7 +841,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
nevents++;
|
nevents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timout is zero, exit immediately whether there are events or not.
|
// If timeout is zero, exit immediately whether there are events or not.
|
||||||
if (timeout_ == 0)
|
if (timeout_ == 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -1024,7 +1024,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
nevents++;
|
nevents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timout is zero, exit immediately whether there are events or not.
|
// If timeout is zero, exit immediately whether there are events or not.
|
||||||
if (timeout_ == 0)
|
if (timeout_ == 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -1298,7 +1298,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
|
|||||||
nevents++;
|
nevents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timout is zero, exit immediately whether there are events or not.
|
// If timeout is zero, exit immediately whether there are events or not.
|
||||||
if (timeout_ == 0)
|
if (timeout_ == 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -1513,7 +1513,7 @@ int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout
|
|||||||
nevents++;
|
nevents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timout is zero, exit immediately whether there are events or not.
|
// If timeout is zero, exit immediately whether there are events or not.
|
||||||
if (timeout_ == 0)
|
if (timeout_ == 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user