mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-22 23:11:03 +01:00
commit
77514e0e9f
14
src/ctx.cpp
14
src/ctx.cpp
@ -405,19 +405,23 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
void zmq::ctx_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
|
||||
void zmq::ctx_t::pend_connection (const std::string &addr_,
|
||||
const endpoint_t &endpoint_, pipe_t **pipes_)
|
||||
{
|
||||
const pending_connection_t pending_connection =
|
||||
{endpoint_, pipes_ [0], pipes_ [1]};
|
||||
|
||||
endpoints_sync.lock ();
|
||||
|
||||
endpoints_t::iterator it = endpoints.find (addr_);
|
||||
if (it == endpoints.end ()) {
|
||||
// Still no bind.
|
||||
pending_connection_.endpoint.socket->inc_seqnum ();
|
||||
pending_connections.insert (pending_connections_t::value_type (std::string (addr_), pending_connection_));
|
||||
endpoint_.socket->inc_seqnum ();
|
||||
pending_connections.insert (pending_connections_t::value_type (addr_, pending_connection));
|
||||
}
|
||||
else
|
||||
// Bind has happened in the mean time, connect directly
|
||||
connect_inproc_sockets(it->second.socket, it->second.options, pending_connection_, connect_side);
|
||||
connect_inproc_sockets (it->second.socket, it->second.options, pending_connection, connect_side);
|
||||
|
||||
endpoints_sync.unlock ();
|
||||
}
|
||||
@ -436,7 +440,7 @@ void zmq::ctx_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_so
|
||||
}
|
||||
|
||||
void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
|
||||
options_t& bind_options, pending_connection_t &pending_connection_, side side_)
|
||||
options_t& bind_options, const pending_connection_t &pending_connection_, side side_)
|
||||
{
|
||||
bind_socket_->inc_seqnum();
|
||||
pending_connection_.bind_pipe->set_tid(bind_socket_->get_tid());
|
||||
|
18
src/ctx.hpp
18
src/ctx.hpp
@ -51,13 +51,6 @@ namespace zmq
|
||||
options_t options;
|
||||
};
|
||||
|
||||
struct pending_connection_t
|
||||
{
|
||||
endpoint_t endpoint;
|
||||
pipe_t* connect_pipe;
|
||||
pipe_t* bind_pipe;
|
||||
};
|
||||
|
||||
// Context object encapsulates all the global state associated with
|
||||
// the library.
|
||||
|
||||
@ -109,7 +102,8 @@ namespace zmq
|
||||
int register_endpoint (const char *addr_, endpoint_t &endpoint_);
|
||||
void unregister_endpoints (zmq::socket_base_t *socket_);
|
||||
endpoint_t find_endpoint (const char *addr_);
|
||||
void pend_connection (const char *addr_, pending_connection_t &pending_connection_);
|
||||
void pend_connection (const std::string &addr_,
|
||||
const endpoint_t &endpoint_, pipe_t **pipes_);
|
||||
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
|
||||
|
||||
enum {
|
||||
@ -121,6 +115,12 @@ namespace zmq
|
||||
|
||||
private:
|
||||
|
||||
struct pending_connection_t
|
||||
{
|
||||
endpoint_t endpoint;
|
||||
pipe_t* connect_pipe;
|
||||
pipe_t* bind_pipe;
|
||||
};
|
||||
|
||||
// Used to check whether the object is a context.
|
||||
uint32_t tag;
|
||||
@ -196,7 +196,7 @@ namespace zmq
|
||||
pid_t pid;
|
||||
#endif
|
||||
enum side { connect_side, bind_side };
|
||||
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, pending_connection_t &pending_connection_, side side_);
|
||||
void connect_inproc_sockets(zmq::socket_base_t *bind_socket_, options_t& bind_options, const pending_connection_t &pending_connection_, side side_);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -152,9 +152,10 @@ zmq::endpoint_t zmq::object_t::find_endpoint (const char *addr_)
|
||||
return ctx->find_endpoint (addr_);
|
||||
}
|
||||
|
||||
void zmq::object_t::pend_connection (const char *addr_, pending_connection_t &pending_connection_)
|
||||
void zmq::object_t::pend_connection (const std::string &addr_,
|
||||
const endpoint_t &endpoint_, pipe_t **pipes_)
|
||||
{
|
||||
ctx->pend_connection (addr_, pending_connection_);
|
||||
ctx->pend_connection (addr_, endpoint_, pipes_);
|
||||
}
|
||||
|
||||
void zmq::object_t::connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_)
|
||||
|
@ -20,6 +20,7 @@
|
||||
#ifndef __ZMQ_OBJECT_HPP_INCLUDED__
|
||||
#define __ZMQ_OBJECT_HPP_INCLUDED__
|
||||
|
||||
#include <string>
|
||||
#include "stdint.hpp"
|
||||
|
||||
namespace zmq
|
||||
@ -61,7 +62,8 @@ namespace zmq
|
||||
int register_endpoint (const char *addr_, zmq::endpoint_t &endpoint_);
|
||||
void unregister_endpoints (zmq::socket_base_t *socket_);
|
||||
zmq::endpoint_t find_endpoint (const char *addr_);
|
||||
void pend_connection (const char *addr_, pending_connection_t &pending_connection_);
|
||||
void pend_connection (const std::string &addr_,
|
||||
const endpoint_t &endpoint, pipe_t **pipes_);
|
||||
void connect_pending (const char *addr_, zmq::socket_base_t *bind_socket_);
|
||||
|
||||
void destroy_socket (zmq::socket_base_t *socket_);
|
||||
|
@ -527,9 +527,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
zmq_assert (written);
|
||||
new_pipes [0]->flush ();
|
||||
|
||||
endpoint_t endpoint = {this, options};
|
||||
pending_connection_t pending_connection = {endpoint, new_pipes [0], new_pipes [1]};
|
||||
pend_connection (addr_, pending_connection);
|
||||
const endpoint_t endpoint = {this, options};
|
||||
pend_connection (std::string (addr_), endpoint, new_pipes);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user