mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-31 14:39:55 +01:00
Problem: socket_base_t::connect_routing_id is protected and only used in router_t and stream_t
Solution: add an intermediary base class routing_socket_base_t, move common functionality there and make connect_routing_id private
This commit is contained in:
parent
25461a78dd
commit
728eddfcfd
@ -37,7 +37,7 @@
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
routing_socket_base_t (parent_, tid_, sid_),
|
||||
_prefetched (false),
|
||||
_routing_id_sent (false),
|
||||
_current_in (NULL),
|
||||
@ -99,21 +99,12 @@ int zmq::router_t::xsetsockopt (int option_,
|
||||
const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
const bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
|
||||
switch (option_) {
|
||||
case ZMQ_CONNECT_ROUTING_ID:
|
||||
// TODO why isn't it possible to set an empty connect_routing_id
|
||||
// (which is the default value)
|
||||
if (optval_ && optvallen_) {
|
||||
connect_routing_id.assign ((char *) optval_, optvallen_);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_ROUTER_RAW:
|
||||
if (is_int && value >= 0) {
|
||||
_raw_socket = (value != 0);
|
||||
@ -147,7 +138,8 @@ int zmq::router_t::xsetsockopt (int option_,
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
return routing_socket_base_t::xsetsockopt (option_, optval_,
|
||||
optvallen_);
|
||||
}
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
@ -469,13 +461,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
bool ok;
|
||||
blob_t routing_id;
|
||||
|
||||
if (connect_routing_id.length ()) {
|
||||
routing_id.set ((unsigned char *) connect_routing_id.c_str (),
|
||||
connect_routing_id.length ());
|
||||
connect_routing_id.clear ();
|
||||
outpipes_t::iterator it = _out_pipes.find (routing_id);
|
||||
if (it != _out_pipes.end ())
|
||||
zmq_assert (false); // Not allowed to duplicate an existing rid
|
||||
const std::string connect_routing_id = extract_connect_routing_id ();
|
||||
if (!connect_routing_id.empty ()) {
|
||||
routing_id.set (
|
||||
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
|
||||
connect_routing_id.length ());
|
||||
// Not allowed to duplicate an existing rid
|
||||
zmq_assert (0 == _out_pipes.count (routing_id));
|
||||
} else if (
|
||||
options
|
||||
.raw_socket) { // Always assign an integral routing id for raw-socket
|
||||
|
@ -45,7 +45,7 @@ class ctx_t;
|
||||
class pipe_t;
|
||||
|
||||
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
|
||||
class router_t : public socket_base_t
|
||||
class router_t : public routing_socket_base_t
|
||||
{
|
||||
public:
|
||||
router_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
|
@ -1762,3 +1762,36 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
|
||||
_monitor_events = 0;
|
||||
}
|
||||
}
|
||||
|
||||
zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
|
||||
uint32_t tid_,
|
||||
int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_)
|
||||
{
|
||||
}
|
||||
|
||||
int zmq::routing_socket_base_t::xsetsockopt (int option_,
|
||||
const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
switch (option_) {
|
||||
case ZMQ_CONNECT_ROUTING_ID:
|
||||
// TODO why isn't it possible to set an empty connect_routing_id
|
||||
// (which is the default value)
|
||||
if (optval_ && optvallen_) {
|
||||
_connect_routing_id.assign (static_cast<const char *> (optval_),
|
||||
optvallen_);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
}
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
|
||||
{
|
||||
std::string res = ZMQ_MOVE (_connect_routing_id);
|
||||
_connect_routing_id.clear ();
|
||||
return res;
|
||||
}
|
||||
|
@ -184,9 +184,6 @@ class socket_base_t : public own_t,
|
||||
// Delay actual destruction of the socket.
|
||||
void process_destroy ();
|
||||
|
||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||
std::string connect_routing_id;
|
||||
|
||||
private:
|
||||
// test if event should be sent and then dispatch it
|
||||
void event (const std::string &addr_, intptr_t fd_, int type_);
|
||||
@ -300,6 +297,21 @@ class socket_base_t : public own_t,
|
||||
socket_base_t (const socket_base_t &);
|
||||
const socket_base_t &operator= (const socket_base_t &);
|
||||
};
|
||||
|
||||
class routing_socket_base_t : public socket_base_t
|
||||
{
|
||||
protected:
|
||||
routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
|
||||
virtual int
|
||||
xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||
|
||||
std::string extract_connect_routing_id ();
|
||||
|
||||
private:
|
||||
// Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types
|
||||
std::string _connect_routing_id;
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -37,7 +37,7 @@
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_),
|
||||
routing_socket_base_t (parent_, tid_, sid_),
|
||||
_prefetched (false),
|
||||
_routing_id_sent (false),
|
||||
_current_out (NULL),
|
||||
@ -177,25 +177,14 @@ int zmq::stream_t::xsetsockopt (int option_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
switch (option_) {
|
||||
case ZMQ_CONNECT_ROUTING_ID:
|
||||
// TODO why isn't it possible to set an empty connect_routing_id
|
||||
// (which is the default value)
|
||||
if (optval_ && optvallen_) {
|
||||
connect_routing_id.assign ((char *) optval_, optvallen_);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
case ZMQ_STREAM_NOTIFY:
|
||||
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
|
||||
&options.raw_notify);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
return routing_socket_base_t::xsetsockopt (option_, optval_,
|
||||
optvallen_);
|
||||
}
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int zmq::stream_t::xrecv (msg_t *msg_)
|
||||
@ -293,12 +282,13 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
||||
unsigned char buffer[5];
|
||||
buffer[0] = 0;
|
||||
blob_t routing_id;
|
||||
if (connect_routing_id.length ()) {
|
||||
routing_id.set ((unsigned char *) connect_routing_id.c_str (),
|
||||
connect_routing_id.length ());
|
||||
connect_routing_id.clear ();
|
||||
outpipes_t::iterator it = _outpipes.find (routing_id);
|
||||
zmq_assert (it == _outpipes.end ());
|
||||
const std::string connect_routing_id = extract_connect_routing_id ();
|
||||
if (!connect_routing_id.empty ()) {
|
||||
routing_id.set (
|
||||
reinterpret_cast<const unsigned char *> (connect_routing_id.c_str ()),
|
||||
connect_routing_id.length ());
|
||||
// Not allowed to duplicate an existing rid
|
||||
zmq_assert (0 == _outpipes.count (routing_id));
|
||||
} else {
|
||||
put_uint32 (buffer + 1, _next_integral_routing_id++);
|
||||
routing_id.set (buffer, sizeof buffer);
|
||||
|
@ -39,7 +39,7 @@ namespace zmq
|
||||
class ctx_t;
|
||||
class pipe_t;
|
||||
|
||||
class stream_t : public socket_base_t
|
||||
class stream_t : public routing_socket_base_t
|
||||
{
|
||||
public:
|
||||
stream_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
|
Loading…
x
Reference in New Issue
Block a user