All devices conflated into a single implementation.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2010-12-06 22:57:29 +01:00
parent ec61751e17
commit 8d6cafe066
8 changed files with 16 additions and 208 deletions

View File

@ -71,12 +71,12 @@ libzmq_la_SOURCES = \
connect_session.hpp \
ctx.hpp \
decoder.hpp \
device.hpp \
devpoll.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
fd.hpp \
forwarder.hpp \
fq.hpp \
i_inout.hpp \
io_object.hpp \
@ -106,7 +106,6 @@ libzmq_la_SOURCES = \
pub.hpp \
pull.hpp \
push.hpp \
queue.hpp \
rep.hpp \
req.hpp \
select.hpp \
@ -114,7 +113,6 @@ libzmq_la_SOURCES = \
session.hpp \
socket_base.hpp \
stdint.hpp \
streamer.hpp \
sub.hpp \
swap.hpp \
tcp_connecter.hpp \
@ -141,11 +139,11 @@ libzmq_la_SOURCES = \
ctx.cpp \
connect_session.cpp \
decoder.cpp \
device.cpp \
devpoll.cpp \
encoder.cpp \
epoll.cpp \
err.cpp \
forwarder.cpp \
fq.cpp \
io_object.cpp \
io_thread.cpp \
@ -167,13 +165,11 @@ libzmq_la_SOURCES = \
pull.cpp \
push.cpp \
pub.cpp \
queue.cpp \
rep.cpp \
req.cpp \
select.cpp \
session.cpp \
socket_base.cpp \
streamer.cpp \
sub.cpp \
swap.cpp \
tcp_connecter.cpp \

View File

@ -21,12 +21,12 @@
#include "../include/zmq.h"
#include "queue.hpp"
#include "device.hpp"
#include "socket_base.hpp"
#include "likely.hpp"
#include "err.hpp"
int zmq::queue (class socket_base_t *insocket_,
int zmq::device (class socket_base_t *insocket_,
class socket_base_t *outsocket_)
{
zmq_msg_t msg;

View File

@ -17,13 +17,13 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_QUEUE_HPP_INCLUDED__
#define __ZMQ_QUEUE_HPP_INCLUDED__
#ifndef __ZMQ_DEVICE_HPP_INCLUDED__
#define __ZMQ_DEVICE_HPP_INCLUDED__
namespace zmq
{
int queue (class socket_base_t *insocket_,
int device (class socket_base_t *insocket_,
class socket_base_t *outsocket_);
}

View File

@ -1,60 +0,0 @@
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "forwarder.hpp"
#include "socket_base.hpp"
#include "likely.hpp"
#include "err.hpp"
int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
{
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
int64_t more;
size_t more_sz = sizeof (more);
while (true) {
rc = insocket_->recv (&msg, 0);
if (unlikely (rc < 0)) {
if (errno == ETERM)
return -1;
errno_assert (false);
}
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
if (unlikely (rc < 0)) {
if (errno == ETERM)
return -1;
errno_assert (false);
}
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
if (errno == ETERM)
return -1;
errno_assert (false);
}
}
return 0;
}

View File

@ -1,31 +0,0 @@
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_FORWARDER_HPP_INCLUDED__
#define __ZMQ_FORWARDER_HPP_INCLUDED__
namespace zmq
{
int forwarder (class socket_base_t *insocket_,
class socket_base_t *outsocket_);
}
#endif

View File

@ -1,60 +0,0 @@
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include "streamer.hpp"
#include "socket_base.hpp"
#include "likely.hpp"
#include "err.hpp"
int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_)
{
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
int64_t more;
size_t more_sz = sizeof (more);
while (true) {
rc = insocket_->recv (&msg, 0);
if (unlikely (rc < 0)) {
if (errno == ETERM)
return -1;
errno_assert (false);
}
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
if (unlikely (rc < 0)) {
if (errno == ETERM)
return -1;
errno_assert (false);
}
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
if (errno == ETERM)
return -1;
errno_assert (false);
}
}
return 0;
}

View File

@ -1,31 +0,0 @@
/*
Copyright (c) 2007-2010 iMatix Corporation
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_STREAMER_HPP_INCLUDED__
#define __ZMQ_STREAMER_HPP_INCLUDED__
namespace zmq
{
int streamer (class socket_base_t *insocket_,
class socket_base_t *outsocket_);
}
#endif

View File

@ -39,9 +39,7 @@
#include <stdlib.h>
#include <new>
#include "forwarder.hpp"
#include "queue.hpp"
#include "streamer.hpp"
#include "device.hpp"
#include "socket_base.hpp"
#include "msg_content.hpp"
#include "stdint.hpp"
@ -685,19 +683,15 @@ int zmq_device (int device_, void *insocket_, void *outsocket_)
errno = EFAULT;
return -1;
}
switch (device_) {
case ZMQ_FORWARDER:
return zmq::forwarder ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
case ZMQ_QUEUE:
return zmq::queue ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
case ZMQ_STREAMER:
return zmq::streamer ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
default:
return EINVAL;
if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
device_ != ZMQ_STREAMER) {
errno = EINVAL;
return -1;
}
return zmq::device ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
}
////////////////////////////////////////////////////////////////////////////////