mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-23 00:08:02 +02:00
Problem: inconsistency of using terms address and uri
Solution: use terms more consistently
This commit is contained in:
@@ -97,15 +97,17 @@
|
|||||||
#include "scatter.hpp"
|
#include "scatter.hpp"
|
||||||
#include "dgram.hpp"
|
#include "dgram.hpp"
|
||||||
|
|
||||||
void zmq::socket_base_t::inprocs_t::emplace (const char *addr_, pipe_t *pipe_)
|
void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
|
||||||
|
pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
_inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_), pipe_);
|
_inprocs.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), pipe_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_base_t::inprocs_t::erase_pipes (const std::string &addr_str_)
|
int zmq::socket_base_t::inprocs_t::erase_pipes (
|
||||||
|
const std::string &endpoint_uri_str_)
|
||||||
{
|
{
|
||||||
const std::pair<map_t::iterator, map_t::iterator> range =
|
const std::pair<map_t::iterator, map_t::iterator> range =
|
||||||
_inprocs.equal_range (addr_str_);
|
_inprocs.equal_range (endpoint_uri_str_);
|
||||||
if (range.first == range.second) {
|
if (range.first == range.second) {
|
||||||
errno = ENOENT;
|
errno = ENOENT;
|
||||||
return -1;
|
return -1;
|
||||||
@@ -297,9 +299,11 @@ void zmq::socket_base_t::stop ()
|
|||||||
send_stop ();
|
send_stop ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO consider renaming protocol_ to scheme_ in conformance with RFC 3986
|
||||||
|
// terminology, but this requires extensive changes to be consistent
|
||||||
int zmq::socket_base_t::parse_uri (const char *uri_,
|
int zmq::socket_base_t::parse_uri (const char *uri_,
|
||||||
std::string &protocol_,
|
std::string &protocol_,
|
||||||
std::string &address_)
|
std::string &path_)
|
||||||
{
|
{
|
||||||
zmq_assert (uri_ != NULL);
|
zmq_assert (uri_ != NULL);
|
||||||
|
|
||||||
@@ -310,9 +314,9 @@ int zmq::socket_base_t::parse_uri (const char *uri_,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
protocol_ = uri.substr (0, pos);
|
protocol_ = uri.substr (0, pos);
|
||||||
address_ = uri.substr (pos + 3);
|
path_ = uri.substr (pos + 3);
|
||||||
|
|
||||||
if (protocol_.empty () || address_.empty ()) {
|
if (protocol_.empty () || path_.empty ()) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -495,7 +499,7 @@ void zmq::socket_base_t::remove_signaler (signaler_t *s_)
|
|||||||
(static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
|
(static_cast<mailbox_safe_t *> (_mailbox))->remove_signaler (s_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_base_t::bind (const char *addr_)
|
int zmq::socket_base_t::bind (const char *endpoint_uri_)
|
||||||
{
|
{
|
||||||
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
||||||
|
|
||||||
@@ -510,19 +514,20 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse addr_ string.
|
// Parse endpoint_uri_ string.
|
||||||
std::string protocol;
|
std::string protocol;
|
||||||
std::string address;
|
std::string address;
|
||||||
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
|
if (parse_uri (endpoint_uri_, protocol, address)
|
||||||
|
|| check_protocol (protocol)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (protocol == "inproc") {
|
if (protocol == "inproc") {
|
||||||
const endpoint_t endpoint = {this, options};
|
const endpoint_t endpoint = {this, options};
|
||||||
rc = register_endpoint (addr_, endpoint);
|
rc = register_endpoint (endpoint_uri_, endpoint);
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
connect_pending (addr_, this);
|
connect_pending (endpoint_uri_, this);
|
||||||
_last_endpoint.assign (addr_);
|
_last_endpoint.assign (endpoint_uri_);
|
||||||
options.connected = true;
|
options.connected = true;
|
||||||
}
|
}
|
||||||
return rc;
|
return rc;
|
||||||
@@ -531,7 +536,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
|
if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") {
|
||||||
// For convenience's sake, bind can be used interchangeable with
|
// For convenience's sake, bind can be used interchangeable with
|
||||||
// connect for PGM, EPGM, NORM transports.
|
// connect for PGM, EPGM, NORM transports.
|
||||||
rc = connect (addr_);
|
rc = connect (endpoint_uri_);
|
||||||
if (rc != -1)
|
if (rc != -1)
|
||||||
options.connected = true;
|
options.connected = true;
|
||||||
return rc;
|
return rc;
|
||||||
@@ -586,7 +591,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
// Save last endpoint URI
|
// Save last endpoint URI
|
||||||
paddr->to_string (_last_endpoint);
|
paddr->to_string (_last_endpoint);
|
||||||
|
|
||||||
add_endpoint (addr_, static_cast<own_t *> (session), newpipe);
|
add_endpoint (endpoint_uri_, static_cast<own_t *> (session), newpipe);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -656,7 +661,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
// Save last endpoint URI
|
// Save last endpoint URI
|
||||||
listener->get_address (_last_endpoint);
|
listener->get_address (_last_endpoint);
|
||||||
|
|
||||||
add_endpoint (addr_, static_cast<own_t *> (listener), NULL);
|
add_endpoint (endpoint_uri_, static_cast<own_t *> (listener), NULL);
|
||||||
options.connected = true;
|
options.connected = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -686,7 +691,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_base_t::connect (const char *addr_)
|
int zmq::socket_base_t::connect (const char *endpoint_uri_)
|
||||||
{
|
{
|
||||||
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
||||||
|
|
||||||
@@ -701,10 +706,11 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse addr_ string.
|
// Parse endpoint_uri_ string.
|
||||||
std::string protocol;
|
std::string protocol;
|
||||||
std::string address;
|
std::string address;
|
||||||
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
|
if (parse_uri (endpoint_uri_, protocol, address)
|
||||||
|
|| check_protocol (protocol)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -714,7 +720,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
// is in place we should follow generic pipe creation algorithm.
|
// is in place we should follow generic pipe creation algorithm.
|
||||||
|
|
||||||
// Find the peer endpoint.
|
// Find the peer endpoint.
|
||||||
const endpoint_t peer = find_endpoint (addr_);
|
const endpoint_t peer = find_endpoint (endpoint_uri_);
|
||||||
|
|
||||||
// The total HWM for an inproc connection should be the sum of
|
// The total HWM for an inproc connection should be the sum of
|
||||||
// the binder's HWM and the connector's HWM.
|
// the binder's HWM and the connector's HWM.
|
||||||
@@ -754,7 +760,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
send_routing_id (new_pipes[0], options);
|
send_routing_id (new_pipes[0], options);
|
||||||
|
|
||||||
const endpoint_t endpoint = {this, options};
|
const endpoint_t endpoint = {this, options};
|
||||||
pend_connection (std::string (addr_), endpoint, new_pipes);
|
pend_connection (std::string (endpoint_uri_), endpoint, new_pipes);
|
||||||
} else {
|
} else {
|
||||||
// If required, send the routing id of the local socket to the peer.
|
// If required, send the routing id of the local socket to the peer.
|
||||||
if (peer.options.recv_routing_id) {
|
if (peer.options.recv_routing_id) {
|
||||||
@@ -776,10 +782,10 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
attach_pipe (new_pipes[0], false, true);
|
attach_pipe (new_pipes[0], false, true);
|
||||||
|
|
||||||
// Save last endpoint URI
|
// Save last endpoint URI
|
||||||
_last_endpoint.assign (addr_);
|
_last_endpoint.assign (endpoint_uri_);
|
||||||
|
|
||||||
// remember inproc connections for disconnect
|
// remember inproc connections for disconnect
|
||||||
_inprocs.emplace (addr_, new_pipes[0]);
|
_inprocs.emplace (endpoint_uri_, new_pipes[0]);
|
||||||
|
|
||||||
options.connected = true;
|
options.connected = true;
|
||||||
return 0;
|
return 0;
|
||||||
@@ -788,7 +794,7 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|
||||||
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
|
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
|
||||||
if (unlikely (is_single_connect)) {
|
if (unlikely (is_single_connect)) {
|
||||||
if (0 != _endpoints.count (addr_)) {
|
if (0 != _endpoints.count (endpoint_uri_)) {
|
||||||
// There is no valid use for multiple connects for SUB-PUB nor
|
// There is no valid use for multiple connects for SUB-PUB nor
|
||||||
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
|
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
|
||||||
// nonsensical results.
|
// nonsensical results.
|
||||||
@@ -964,11 +970,11 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
// Save last endpoint URI
|
// Save last endpoint URI
|
||||||
paddr->to_string (_last_endpoint);
|
paddr->to_string (_last_endpoint);
|
||||||
|
|
||||||
add_endpoint (addr_, static_cast<own_t *> (session), newpipe);
|
add_endpoint (endpoint_uri_, static_cast<own_t *> (session), newpipe);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_address_,
|
std::string zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_uri_,
|
||||||
const char *tcp_address_)
|
const char *tcp_address_)
|
||||||
{
|
{
|
||||||
// The resolved last_endpoint is used as a key in the endpoints map.
|
// The resolved last_endpoint is used as a key in the endpoints map.
|
||||||
@@ -976,36 +982,36 @@ std::string zmq::socket_base_t::resolve_tcp_addr (std::string endpoint_address_,
|
|||||||
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
|
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
|
||||||
// resolve before giving up. Given at this stage we don't know whether a
|
// resolve before giving up. Given at this stage we don't know whether a
|
||||||
// socket is connected or bound, try with both.
|
// socket is connected or bound, try with both.
|
||||||
if (_endpoints.find (endpoint_address_) == _endpoints.end ()) {
|
if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) {
|
||||||
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
|
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
|
||||||
alloc_assert (tcp_addr);
|
alloc_assert (tcp_addr);
|
||||||
int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
|
int rc = tcp_addr->resolve (tcp_address_, false, options.ipv6);
|
||||||
|
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
tcp_addr->to_string (endpoint_address_);
|
tcp_addr->to_string (endpoint_uri_);
|
||||||
if (_endpoints.find (endpoint_address_) == _endpoints.end ()) {
|
if (_endpoints.find (endpoint_uri_) == _endpoints.end ()) {
|
||||||
rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
|
rc = tcp_addr->resolve (tcp_address_, true, options.ipv6);
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
tcp_addr->to_string (endpoint_address_);
|
tcp_addr->to_string (endpoint_uri_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LIBZMQ_DELETE (tcp_addr);
|
LIBZMQ_DELETE (tcp_addr);
|
||||||
}
|
}
|
||||||
return endpoint_address_;
|
return endpoint_uri_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::add_endpoint (const char *addr_,
|
void zmq::socket_base_t::add_endpoint (const char *endpoint_uri_,
|
||||||
own_t *endpoint_,
|
own_t *endpoint_,
|
||||||
pipe_t *pipe_)
|
pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Activate the session. Make it a child of this socket.
|
// Activate the session. Make it a child of this socket.
|
||||||
launch_child (endpoint_);
|
launch_child (endpoint_);
|
||||||
_endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (addr_),
|
_endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_),
|
||||||
endpoint_pipe_t (endpoint_, pipe_));
|
endpoint_pipe_t (endpoint_, pipe_));
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_base_t::term_endpoint (const char *addr_)
|
int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_)
|
||||||
{
|
{
|
||||||
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
||||||
|
|
||||||
@@ -1016,7 +1022,7 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check whether endpoint address passed to the function is valid.
|
// Check whether endpoint address passed to the function is valid.
|
||||||
if (unlikely (!addr_)) {
|
if (unlikely (!endpoint_uri_)) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@@ -1028,30 +1034,31 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parse addr_ string.
|
// Parse endpoint_uri_ string.
|
||||||
std::string protocol;
|
std::string uri_protocol;
|
||||||
std::string address;
|
std::string uri_path;
|
||||||
if (parse_uri (addr_, protocol, address) || check_protocol (protocol)) {
|
if (parse_uri (endpoint_uri_, uri_protocol, uri_path)
|
||||||
|
|| check_protocol (uri_protocol)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::string addr_str = std::string (addr_);
|
const std::string endpoint_uri_str = std::string (endpoint_uri_);
|
||||||
|
|
||||||
// Disconnect an inproc socket
|
// Disconnect an inproc socket
|
||||||
if (protocol == "inproc") {
|
if (uri_protocol == "inproc") {
|
||||||
return unregister_endpoint (addr_str, this) == 0
|
return unregister_endpoint (endpoint_uri_str, this) == 0
|
||||||
? 0
|
? 0
|
||||||
: _inprocs.erase_pipes (addr_str);
|
: _inprocs.erase_pipes (endpoint_uri_str);
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::string resolved_addr =
|
const std::string resolved_endpoint_uri =
|
||||||
protocol == protocol_name::tcp
|
uri_protocol == protocol_name::tcp
|
||||||
? resolve_tcp_addr (addr_str, address.c_str ())
|
? resolve_tcp_addr (endpoint_uri_str, uri_path.c_str ())
|
||||||
: addr_str;
|
: endpoint_uri_str;
|
||||||
|
|
||||||
// Find the endpoints range (if any) corresponding to the addr_ string.
|
// Find the endpoints range (if any) corresponding to the endpoint_uri_ string.
|
||||||
const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
|
const std::pair<endpoints_t::iterator, endpoints_t::iterator> range =
|
||||||
_endpoints.equal_range (resolved_addr);
|
_endpoints.equal_range (resolved_endpoint_uri);
|
||||||
if (range.first == range.second) {
|
if (range.first == range.second) {
|
||||||
errno = ENOENT;
|
errno = ENOENT;
|
||||||
return -1;
|
return -1;
|
||||||
@@ -1564,7 +1571,7 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
|
|||||||
stop_monitor ();
|
stop_monitor ();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// Parse addr_ string.
|
// Parse endpoint_uri_ string.
|
||||||
std::string protocol;
|
std::string protocol;
|
||||||
std::string address;
|
std::string address;
|
||||||
if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
|
if (parse_uri (endpoint_, protocol, address) || check_protocol (protocol))
|
||||||
@@ -1599,101 +1606,104 @@ int zmq::socket_base_t::monitor (const char *endpoint_, int events_)
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_connected (const std::string &addr_,
|
void zmq::socket_base_t::event_connected (const std::string &endpoint_uri_,
|
||||||
zmq::fd_t fd_)
|
zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
event (addr_, fd_, ZMQ_EVENT_CONNECTED);
|
event (endpoint_uri_, fd_, ZMQ_EVENT_CONNECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_connect_delayed (const std::string &addr_,
|
void zmq::socket_base_t::event_connect_delayed (
|
||||||
int err_)
|
const std::string &endpoint_uri_, int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_CONNECT_DELAYED);
|
event (endpoint_uri_, err_, ZMQ_EVENT_CONNECT_DELAYED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_connect_retried (const std::string &addr_,
|
void zmq::socket_base_t::event_connect_retried (
|
||||||
int interval_)
|
const std::string &endpoint_uri_, int interval_)
|
||||||
{
|
{
|
||||||
event (addr_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
|
event (endpoint_uri_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_listening (const std::string &addr_,
|
void zmq::socket_base_t::event_listening (const std::string &endpoint_uri_,
|
||||||
zmq::fd_t fd_)
|
zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
event (addr_, fd_, ZMQ_EVENT_LISTENING);
|
event (endpoint_uri_, fd_, ZMQ_EVENT_LISTENING);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_bind_failed (const std::string &addr_, int err_)
|
void zmq::socket_base_t::event_bind_failed (const std::string &endpoint_uri_,
|
||||||
|
int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_BIND_FAILED);
|
event (endpoint_uri_, err_, ZMQ_EVENT_BIND_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_accepted (const std::string &addr_,
|
void zmq::socket_base_t::event_accepted (const std::string &endpoint_uri_,
|
||||||
zmq::fd_t fd_)
|
zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
event (addr_, fd_, ZMQ_EVENT_ACCEPTED);
|
event (endpoint_uri_, fd_, ZMQ_EVENT_ACCEPTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_accept_failed (const std::string &addr_,
|
void zmq::socket_base_t::event_accept_failed (const std::string &endpoint_uri_,
|
||||||
int err_)
|
int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_ACCEPT_FAILED);
|
event (endpoint_uri_, err_, ZMQ_EVENT_ACCEPT_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_closed (const std::string &addr_, zmq::fd_t fd_)
|
void zmq::socket_base_t::event_closed (const std::string &endpoint_uri_,
|
||||||
|
zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
event (addr_, fd_, ZMQ_EVENT_CLOSED);
|
event (endpoint_uri_, fd_, ZMQ_EVENT_CLOSED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_close_failed (const std::string &addr_, int err_)
|
void zmq::socket_base_t::event_close_failed (const std::string &endpoint_uri_,
|
||||||
|
int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_CLOSE_FAILED);
|
event (endpoint_uri_, err_, ZMQ_EVENT_CLOSE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_disconnected (const std::string &addr_,
|
void zmq::socket_base_t::event_disconnected (const std::string &endpoint_uri_,
|
||||||
zmq::fd_t fd_)
|
zmq::fd_t fd_)
|
||||||
{
|
{
|
||||||
event (addr_, fd_, ZMQ_EVENT_DISCONNECTED);
|
event (endpoint_uri_, fd_, ZMQ_EVENT_DISCONNECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_handshake_failed_no_detail (
|
void zmq::socket_base_t::event_handshake_failed_no_detail (
|
||||||
const std::string &addr_, int err_)
|
const std::string &endpoint_uri_, int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
|
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_handshake_failed_protocol (
|
void zmq::socket_base_t::event_handshake_failed_protocol (
|
||||||
const std::string &addr_, int err_)
|
const std::string &endpoint_uri_, int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
|
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_handshake_failed_auth (const std::string &addr_,
|
void zmq::socket_base_t::event_handshake_failed_auth (
|
||||||
int err_)
|
const std::string &endpoint_uri_, int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
|
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event_handshake_succeeded (const std::string &addr_,
|
void zmq::socket_base_t::event_handshake_succeeded (
|
||||||
int err_)
|
const std::string &endpoint_uri_, int err_)
|
||||||
{
|
{
|
||||||
event (addr_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
|
event (endpoint_uri_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::event (const std::string &addr_,
|
void zmq::socket_base_t::event (const std::string &endpoint_uri_,
|
||||||
intptr_t value_,
|
intptr_t value_,
|
||||||
int type_)
|
int type_)
|
||||||
{
|
{
|
||||||
scoped_lock_t lock (_monitor_sync);
|
scoped_lock_t lock (_monitor_sync);
|
||||||
if (_monitor_events & type_) {
|
if (_monitor_events & type_) {
|
||||||
monitor_event (type_, value_, addr_);
|
monitor_event (type_, value_, endpoint_uri_);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send a monitor event
|
// Send a monitor event
|
||||||
void zmq::socket_base_t::monitor_event (int event_,
|
void zmq::socket_base_t::monitor_event (int event_,
|
||||||
intptr_t value_,
|
intptr_t value_,
|
||||||
const std::string &addr_) const
|
const std::string &endpoint_uri_) const
|
||||||
{
|
{
|
||||||
// this is a private method which is only called from
|
// this is a private method which is only called from
|
||||||
// contexts where the mutex has been locked before
|
// contexts where the mutex has been locked before
|
||||||
@@ -1711,8 +1721,9 @@ void zmq::socket_base_t::monitor_event (int event_,
|
|||||||
zmq_sendmsg (_monitor_socket, &msg, ZMQ_SNDMORE);
|
zmq_sendmsg (_monitor_socket, &msg, ZMQ_SNDMORE);
|
||||||
|
|
||||||
// Send address in second frame
|
// Send address in second frame
|
||||||
zmq_msg_init_size (&msg, addr_.size ());
|
zmq_msg_init_size (&msg, endpoint_uri_.size ());
|
||||||
memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ());
|
memcpy (zmq_msg_data (&msg), endpoint_uri_.c_str (),
|
||||||
|
endpoint_uri_.size ());
|
||||||
zmq_sendmsg (_monitor_socket, &msg, 0);
|
zmq_sendmsg (_monitor_socket, &msg, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -82,9 +82,9 @@ class socket_base_t : public own_t,
|
|||||||
// Interface for communication with the API layer.
|
// Interface for communication with the API layer.
|
||||||
int setsockopt (int option_, const void *optval_, size_t optvallen_);
|
int setsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
int getsockopt (int option_, void *optval_, size_t *optvallen_);
|
int getsockopt (int option_, void *optval_, size_t *optvallen_);
|
||||||
int bind (const char *addr_);
|
int bind (const char *endpoint_uri_);
|
||||||
int connect (const char *addr_);
|
int connect (const char *endpoint_uri_);
|
||||||
int term_endpoint (const char *addr_);
|
int term_endpoint (const char *endpoint_uri_);
|
||||||
int send (zmq::msg_t *msg_, int flags_);
|
int send (zmq::msg_t *msg_, int flags_);
|
||||||
int recv (zmq::msg_t *msg_, int flags_);
|
int recv (zmq::msg_t *msg_, int flags_);
|
||||||
void add_signaler (signaler_t *s_);
|
void add_signaler (signaler_t *s_);
|
||||||
@@ -120,20 +120,24 @@ class socket_base_t : public own_t,
|
|||||||
|
|
||||||
int monitor (const char *endpoint_, int events_);
|
int monitor (const char *endpoint_, int events_);
|
||||||
|
|
||||||
void event_connected (const std::string &addr_, zmq::fd_t fd_);
|
void event_connected (const std::string &endpoint_uri_, zmq::fd_t fd_);
|
||||||
void event_connect_delayed (const std::string &addr_, int err_);
|
void event_connect_delayed (const std::string &endpoint_uri_, int err_);
|
||||||
void event_connect_retried (const std::string &addr_, int interval_);
|
void event_connect_retried (const std::string &endpoint_uri_,
|
||||||
void event_listening (const std::string &addr_, zmq::fd_t fd_);
|
int interval_);
|
||||||
void event_bind_failed (const std::string &addr_, int err_);
|
void event_listening (const std::string &endpoint_uri_, zmq::fd_t fd_);
|
||||||
void event_accepted (const std::string &addr_, zmq::fd_t fd_);
|
void event_bind_failed (const std::string &endpoint_uri_, int err_);
|
||||||
void event_accept_failed (const std::string &addr_, int err_);
|
void event_accepted (const std::string &endpoint_uri_, zmq::fd_t fd_);
|
||||||
void event_closed (const std::string &addr_, zmq::fd_t fd_);
|
void event_accept_failed (const std::string &endpoint_uri_, int err_);
|
||||||
void event_close_failed (const std::string &addr_, int err_);
|
void event_closed (const std::string &endpoint_uri_, zmq::fd_t fd_);
|
||||||
void event_disconnected (const std::string &addr_, zmq::fd_t fd_);
|
void event_close_failed (const std::string &endpoint_uri_, int err_);
|
||||||
void event_handshake_failed_no_detail (const std::string &addr_, int err_);
|
void event_disconnected (const std::string &endpoint_uri_, zmq::fd_t fd_);
|
||||||
void event_handshake_failed_protocol (const std::string &addr_, int err_);
|
void event_handshake_failed_no_detail (const std::string &endpoint_uri_,
|
||||||
void event_handshake_failed_auth (const std::string &addr_, int err_);
|
int err_);
|
||||||
void event_handshake_succeeded (const std::string &addr_, int err_);
|
void event_handshake_failed_protocol (const std::string &endpoint_uri_,
|
||||||
|
int err_);
|
||||||
|
void event_handshake_failed_auth (const std::string &endpoint_uri_,
|
||||||
|
int err_);
|
||||||
|
void event_handshake_succeeded (const std::string &endpoint_uri_, int err_);
|
||||||
|
|
||||||
// Query the state of a specific peer. The default implementation
|
// Query the state of a specific peer. The default implementation
|
||||||
// always returns an ENOTSUP error.
|
// always returns an ENOTSUP error.
|
||||||
@@ -182,17 +186,19 @@ class socket_base_t : public own_t,
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
// test if event should be sent and then dispatch it
|
// test if event should be sent and then dispatch it
|
||||||
void event (const std::string &addr_, intptr_t value_, int type_);
|
void event (const std::string &endpoint_uri_, intptr_t value_, int type_);
|
||||||
|
|
||||||
// Socket event data dispatch
|
// Socket event data dispatch
|
||||||
void
|
void monitor_event (int event_,
|
||||||
monitor_event (int event_, intptr_t value_, const std::string &addr_) const;
|
intptr_t value_,
|
||||||
|
const std::string &endpoint_uri_) const;
|
||||||
|
|
||||||
// Monitor socket cleanup
|
// Monitor socket cleanup
|
||||||
void stop_monitor (bool send_monitor_stopped_event_ = true);
|
void stop_monitor (bool send_monitor_stopped_event_ = true);
|
||||||
|
|
||||||
// Creates new endpoint ID and adds the endpoint to the map.
|
// Creates new endpoint ID and adds the endpoint to the map.
|
||||||
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe_);
|
void
|
||||||
|
add_endpoint (const char *endpoint_uri_, own_t *endpoint_, pipe_t *pipe_);
|
||||||
|
|
||||||
// Map of open endpoints.
|
// Map of open endpoints.
|
||||||
typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t;
|
typedef std::pair<own_t *, pipe_t *> endpoint_pipe_t;
|
||||||
@@ -203,8 +209,8 @@ class socket_base_t : public own_t,
|
|||||||
class inprocs_t
|
class inprocs_t
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
void emplace (const char *addr_, pipe_t *pipe_);
|
void emplace (const char *endpoint_uri_, pipe_t *pipe_);
|
||||||
int erase_pipes (const std::string &addr_str_);
|
int erase_pipes (const std::string &endpoint_uri_str_);
|
||||||
void erase_pipe (pipe_t *pipe_);
|
void erase_pipe (pipe_t *pipe_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@@ -234,7 +240,7 @@ class socket_base_t : public own_t,
|
|||||||
|
|
||||||
// Parse URI string.
|
// Parse URI string.
|
||||||
static int
|
static int
|
||||||
parse_uri (const char *uri_, std::string &protocol_, std::string &address_);
|
parse_uri (const char *uri_, std::string &protocol_, std::string &path_);
|
||||||
|
|
||||||
// Check whether transport protocol, as specified in connect or
|
// Check whether transport protocol, as specified in connect or
|
||||||
// bind, is available and compatible with the socket type.
|
// bind, is available and compatible with the socket type.
|
||||||
@@ -259,7 +265,7 @@ class socket_base_t : public own_t,
|
|||||||
|
|
||||||
void update_pipe_options (int option_);
|
void update_pipe_options (int option_);
|
||||||
|
|
||||||
std::string resolve_tcp_addr (std::string endpoint_address_,
|
std::string resolve_tcp_addr (std::string endpoint_uri_,
|
||||||
const char *tcp_address_);
|
const char *tcp_address_);
|
||||||
|
|
||||||
// Socket's mailbox object.
|
// Socket's mailbox object.
|
||||||
|
Reference in New Issue
Block a user