mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-14 15:05:38 +02:00
Merge branch 'master' of https://github.com/zeromq/libzmq
This commit is contained in:
@@ -121,7 +121,8 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
|
||||
destroyed (false),
|
||||
last_tsc (0),
|
||||
ticks (0),
|
||||
rcvmore (false)
|
||||
rcvmore (false),
|
||||
thread_safe_flag (false)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -209,14 +210,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_)
|
||||
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||
{
|
||||
// First, register the pipe so that we can terminate it later on.
|
||||
pipe_->set_event_sink (this);
|
||||
pipes.push_back (pipe_);
|
||||
|
||||
// Let the derived socket type know about new pipe.
|
||||
xattach_pipe (pipe_);
|
||||
xattach_pipe (pipe_, icanhasall_);
|
||||
|
||||
// If the socket is already being closed, ask any new pipes to terminate
|
||||
// straight away.
|
||||
@@ -458,8 +459,14 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
rc = pipepair (parents, pipes, hwms, delays);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// PGM does not support subscription forwarding; ask for all data to be
|
||||
// sent to this pipe.
|
||||
bool icanhasall = false;
|
||||
if (protocol == "pgm" || protocol == "epgm")
|
||||
icanhasall = true;
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (pipes [0]);
|
||||
attach_pipe (pipes [0], icanhasall);
|
||||
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
@@ -871,3 +878,18 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::set_thread_safe()
|
||||
{
|
||||
thread_safe_flag = true;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::lock()
|
||||
{
|
||||
sync.lock();
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::unlock()
|
||||
{
|
||||
sync.unlock();
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user