mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-12 10:33:52 +01:00
Merge pull request #1909 from somdoron/master
problem: push-pull socket types are not thread safe
This commit is contained in:
commit
99763cce41
2
.gitignore
vendored
2
.gitignore
vendored
@ -126,6 +126,8 @@ test_poller
|
||||
test_timers
|
||||
test_radio_dish
|
||||
test_udp
|
||||
test_scatter_gather
|
||||
test_socketopt_hwm
|
||||
test_use_fd_ipc
|
||||
test_use_fd_tcp
|
||||
tests/test*.log
|
||||
|
@ -491,7 +491,9 @@ set (cxx-sources
|
||||
radio.cpp
|
||||
dish.cpp
|
||||
udp_engine.cpp
|
||||
udp_address.cpp)
|
||||
udp_address.cpp
|
||||
scatter.cpp
|
||||
gather.cpp)
|
||||
|
||||
set (rc-sources version.rc)
|
||||
|
||||
|
10
Makefile.am
10
Makefile.am
@ -57,6 +57,8 @@ src_libzmq_la_SOURCES = \
|
||||
src/fd.hpp \
|
||||
src/fq.cpp \
|
||||
src/fq.hpp \
|
||||
src/gather.cpp \
|
||||
src/gather.hpp \
|
||||
src/gssapi_mechanism_base.cpp \
|
||||
src/gssapi_mechanism_base.hpp \
|
||||
src/gssapi_client.cpp \
|
||||
@ -153,6 +155,8 @@ src_libzmq_la_SOURCES = \
|
||||
src/req.hpp \
|
||||
src/router.cpp \
|
||||
src/router.hpp \
|
||||
src/scatter.cpp \
|
||||
src/scatter.hpp \
|
||||
src/select.cpp \
|
||||
src/select.hpp \
|
||||
src/server.cpp \
|
||||
@ -396,7 +400,8 @@ test_apps = \
|
||||
tests/test_poller \
|
||||
tests/test_timers \
|
||||
tests/test_radio_dish \
|
||||
tests/test_udp
|
||||
tests/test_udp \
|
||||
tests/test_scatter_gather
|
||||
|
||||
tests_test_system_SOURCES = tests/test_system.cpp
|
||||
tests_test_system_LDADD = src/libzmq.la
|
||||
@ -616,6 +621,9 @@ tests_test_radio_dish_LDADD = src/libzmq.la
|
||||
tests_test_udp_SOURCES = tests/test_udp.cpp
|
||||
tests_test_udp_LDADD = src/libzmq.la
|
||||
|
||||
tests_test_scatter_gather_SOURCES = tests/test_scatter_gather.cpp
|
||||
tests_test_scatter_gather_LDADD = src/libzmq.la
|
||||
|
||||
if !ON_MINGW
|
||||
if !ON_CYGWIN
|
||||
test_apps += \
|
||||
|
@ -264,6 +264,8 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg);
|
||||
#define ZMQ_CLIENT 13
|
||||
#define ZMQ_RADIO 14
|
||||
#define ZMQ_DISH 15
|
||||
#define ZMQ_GATHER 16
|
||||
#define ZMQ_SCATTER 17
|
||||
|
||||
/* Deprecated aliases */
|
||||
#define ZMQ_XREQ ZMQ_DEALER
|
||||
|
94
src/gather.cpp
Normal file
94
src/gather.cpp
Normal file
@ -0,0 +1,94 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "precompiled.hpp"
|
||||
#include "macros.hpp"
|
||||
#include "gather.hpp"
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
#include "pipe.hpp"
|
||||
|
||||
zmq::gather_t::gather_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_, true)
|
||||
{
|
||||
options.type = ZMQ_GATHER;
|
||||
}
|
||||
|
||||
zmq::gather_t::~gather_t ()
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::gather_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||
{
|
||||
LIBZMQ_UNUSED (subscribe_to_all_);
|
||||
|
||||
zmq_assert (pipe_);
|
||||
fq.attach (pipe_);
|
||||
}
|
||||
|
||||
void zmq::gather_t::xread_activated (pipe_t *pipe_)
|
||||
{
|
||||
fq.activated (pipe_);
|
||||
}
|
||||
|
||||
void zmq::gather_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
fq.pipe_terminated (pipe_);
|
||||
}
|
||||
|
||||
int zmq::gather_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
int rc = fq.recvpipe (msg_, NULL);
|
||||
|
||||
// Drop any messages with more flag
|
||||
while (rc == 0 && msg_->flags () & msg_t::more) {
|
||||
|
||||
// drop all frames of the current multi-frame message
|
||||
rc = fq.recvpipe (msg_, NULL);
|
||||
|
||||
while (rc == 0 && msg_->flags () & msg_t::more)
|
||||
rc = fq.recvpipe (msg_, NULL);
|
||||
|
||||
// get the new message
|
||||
if (rc == 0)
|
||||
rc = fq.recvpipe (msg_, NULL);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
bool zmq::gather_t::xhas_in ()
|
||||
{
|
||||
return fq.has_in ();
|
||||
}
|
||||
|
||||
zmq::blob_t zmq::gather_t::get_credential () const
|
||||
{
|
||||
return fq.get_credential ();
|
||||
}
|
75
src/gather.hpp
Normal file
75
src/gather.hpp
Normal file
@ -0,0 +1,75 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_GATHER_HPP_INCLUDED__
|
||||
#define __ZMQ_GATHER_HPP_INCLUDED__
|
||||
|
||||
#include "socket_base.hpp"
|
||||
#include "session_base.hpp"
|
||||
#include "fq.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
|
||||
class ctx_t;
|
||||
class pipe_t;
|
||||
class msg_t;
|
||||
class io_thread_t;
|
||||
|
||||
class gather_t :
|
||||
public socket_base_t
|
||||
{
|
||||
public:
|
||||
|
||||
gather_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~gather_t ();
|
||||
|
||||
protected:
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
|
||||
int xrecv (zmq::msg_t *msg_);
|
||||
bool xhas_in ();
|
||||
blob_t get_credential () const;
|
||||
void xread_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
|
||||
// Fair queueing object for inbound pipes.
|
||||
fq_t fq;
|
||||
|
||||
gather_t (const gather_t&);
|
||||
const gather_t &operator = (const gather_t&);
|
||||
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -77,8 +77,9 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
|
||||
"DEALER", "ROUTER", "PULL", "PUSH",
|
||||
"XPUB", "XSUB", "STREAM",
|
||||
"SERVER", "CLIENT",
|
||||
"RADIO", "DISH"};
|
||||
zmq_assert (socket_type >= 0 && socket_type <= 15);
|
||||
"RADIO", "DISH",
|
||||
"GATHER", "SCATTER"};
|
||||
zmq_assert (socket_type >= 0 && socket_type <= 17);
|
||||
return names [socket_type];
|
||||
}
|
||||
|
||||
@ -198,6 +199,10 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
|
||||
return type_ == "DISH";
|
||||
case ZMQ_DISH:
|
||||
return type_ == "RADIO";
|
||||
case ZMQ_GATHER:
|
||||
return type_ == "SCATTER";
|
||||
case ZMQ_SCATTER:
|
||||
return type_ == "GATHER";
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
83
src/scatter.cpp
Normal file
83
src/scatter.cpp
Normal file
@ -0,0 +1,83 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "precompiled.hpp"
|
||||
#include "macros.hpp"
|
||||
#include "scatter.hpp"
|
||||
#include "pipe.hpp"
|
||||
#include "err.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
zmq::scatter_t::scatter_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
socket_base_t (parent_, tid_, sid_, true)
|
||||
{
|
||||
options.type = ZMQ_SCATTER;
|
||||
}
|
||||
|
||||
zmq::scatter_t::~scatter_t ()
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::scatter_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||
{
|
||||
LIBZMQ_UNUSED (subscribe_to_all_);
|
||||
|
||||
// Don't delay pipe termination as there is no one
|
||||
// to receive the delimiter.
|
||||
pipe_->set_nodelay ();
|
||||
|
||||
zmq_assert (pipe_);
|
||||
lb.attach (pipe_);
|
||||
}
|
||||
|
||||
void zmq::scatter_t::xwrite_activated (pipe_t *pipe_)
|
||||
{
|
||||
lb.activated (pipe_);
|
||||
}
|
||||
|
||||
void zmq::scatter_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
lb.pipe_terminated (pipe_);
|
||||
}
|
||||
|
||||
int zmq::scatter_t::xsend (msg_t *msg_)
|
||||
{
|
||||
// SCATTER sockets do not allow multipart data (ZMQ_SNDMORE)
|
||||
if (msg_->flags () & msg_t::more) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return lb.send (msg_);
|
||||
}
|
||||
|
||||
bool zmq::scatter_t::xhas_out ()
|
||||
{
|
||||
return lb.has_out ();
|
||||
}
|
73
src/scatter.hpp
Normal file
73
src/scatter.hpp
Normal file
@ -0,0 +1,73 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_SCATTER_HPP_INCLUDED__
|
||||
#define __ZMQ_SCATTER_HPP_INCLUDED__
|
||||
|
||||
#include "socket_base.hpp"
|
||||
#include "session_base.hpp"
|
||||
#include "lb.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
|
||||
class ctx_t;
|
||||
class pipe_t;
|
||||
class msg_t;
|
||||
class io_thread_t;
|
||||
|
||||
class scatter_t :
|
||||
public socket_base_t
|
||||
{
|
||||
public:
|
||||
|
||||
scatter_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
~scatter_t ();
|
||||
|
||||
protected:
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
|
||||
int xsend (zmq::msg_t *msg_);
|
||||
bool xhas_out ();
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
private:
|
||||
|
||||
// Load balancer managing the outbound pipes.
|
||||
lb_t lb;
|
||||
|
||||
scatter_t (const scatter_t&);
|
||||
const scatter_t &operator = (const scatter_t&);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -81,6 +81,8 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
||||
case ZMQ_STREAM:
|
||||
case ZMQ_SERVER:
|
||||
case ZMQ_CLIENT:
|
||||
case ZMQ_GATHER:
|
||||
case ZMQ_SCATTER:
|
||||
s = new (std::nothrow) session_base_t (io_thread_, active_,
|
||||
socket_, options_, addr_);
|
||||
break;
|
||||
|
@ -96,6 +96,8 @@
|
||||
#include "client.hpp"
|
||||
#include "radio.hpp"
|
||||
#include "dish.hpp"
|
||||
#include "gather.hpp"
|
||||
#include "scatter.hpp"
|
||||
|
||||
#define ENTER_MUTEX() \
|
||||
if (thread_safe) \
|
||||
@ -163,6 +165,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
|
||||
case ZMQ_DISH:
|
||||
s = new (std::nothrow) dish_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_GATHER:
|
||||
s = new (std::nothrow) gather_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_SCATTER:
|
||||
s = new (std::nothrow) scatter_t (parent_, tid_, sid_);
|
||||
break;
|
||||
default:
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
|
@ -71,6 +71,7 @@ set(tests
|
||||
test_timers
|
||||
test_radio_dish
|
||||
test_udp
|
||||
test_scatter_gather
|
||||
)
|
||||
if(NOT WIN32)
|
||||
list(APPEND tests
|
||||
@ -162,4 +163,3 @@ foreach(TEST_SOURCE ${ALL_TEST_SOURCES})
|
||||
message(AUTHOR_WARNING "Test '${TESTNAME}' is not known to CTest.")
|
||||
endif()
|
||||
endforeach()
|
||||
|
||||
|
84
tests/test_scatter_gather.cpp
Normal file
84
tests/test_scatter_gather.cpp
Normal file
@ -0,0 +1,84 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
void *scatter = zmq_socket (ctx, ZMQ_SCATTER);
|
||||
void *gather = zmq_socket (ctx, ZMQ_GATHER);
|
||||
void *gather2 = zmq_socket (ctx, ZMQ_GATHER);
|
||||
|
||||
int rc = zmq_bind (scatter, "inproc://test-scatter-gather");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (gather, "inproc://test-scatter-gather");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (gather2, "inproc://test-scatter-gather");
|
||||
assert (rc == 0);
|
||||
|
||||
// Should fail, multipart is not supported
|
||||
rc = s_sendmore (scatter, "1");
|
||||
assert (rc == -1);
|
||||
|
||||
rc = s_send (scatter, "1");
|
||||
assert (rc == 1);
|
||||
|
||||
rc = s_send (scatter, "2");
|
||||
assert (rc == 1);
|
||||
|
||||
char* message = s_recv (gather);
|
||||
assert (message);
|
||||
assert (streq(message, "1"));
|
||||
free(message);
|
||||
|
||||
message = s_recv (gather2);
|
||||
assert (message);
|
||||
assert (streq(message, "2"));
|
||||
free(message);
|
||||
|
||||
rc = zmq_close (scatter);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (gather);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (gather2);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0 ;
|
||||
}
|
Loading…
Reference in New Issue
Block a user