devices can be created via API

This commit is contained in:
Jon Dyte
2010-04-07 08:20:01 +02:00
committed by Martin Sustrik
parent 0777567e89
commit edfd05df8e
13 changed files with 309 additions and 119 deletions

View File

@@ -20,113 +20,6 @@
#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
class queue
{
public:
queue (zmq::socket_t& reply, zmq::socket_t& request) :
xrep (reply),
xreq (request)
{
items [0].socket = reply;
items [0].fd = 0;
items [0].events = ZMQ_POLLIN;
items [0].revents = 0;
items [1].socket = request;
items [1].fd = 0;
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;
m_next_request_method = &queue::get_request;
m_next_response_method = &queue::get_response;
}
void run()
{
while (true) {
int rc = zmq::poll (&items [0], 2, -1);
if (rc < 0)
break;
next_request();
next_response();
}
}
private:
void next_request()
{
(this->*m_next_request_method) ();
}
void next_response()
{
(this->*m_next_response_method) ();
}
void get_request()
{
if (items [0].revents & ZMQ_POLLIN ) {
int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [0].events &= ~ZMQ_POLLIN;
items [1].events |= ZMQ_POLLOUT;
m_next_request_method = &queue::send_request;
}
}
void send_request()
{
if (items [1].revents & ZMQ_POLLOUT) {
int rc = xreq.send (request_msg, ZMQ_NOBLOCK);
if (!rc) return;
items [1].events &= ~ZMQ_POLLOUT;
items [0].events |= ZMQ_POLLIN;
m_next_request_method = &queue::get_request;
}
}
void get_response()
{
if ( items [1].revents & ZMQ_POLLIN ) {
int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [1].events &= ~ZMQ_POLLIN;
items [0].events |= ZMQ_POLLOUT;
m_next_response_method = &queue::send_response;
}
}
void send_response()
{
if (items [0].revents & ZMQ_POLLOUT) {
int rc = xrep.send (response_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [0].events &= ~ZMQ_POLLOUT;
items [1].events |= ZMQ_POLLIN;
m_next_response_method = &queue::get_response;
}
}
zmq::socket_t & xrep;
zmq::socket_t & xreq;
zmq_pollitem_t items [2];
zmq::message_t request_msg;
zmq::message_t response_msg;
typedef void (queue::*next_method) ();
next_method m_next_request_method;
next_method m_next_response_method;
queue (queue const &);
void operator = (queue const &);
};
int main (int argc, char *argv [])
{
if (argc != 2) {
@@ -219,8 +112,7 @@ int main (int argc, char *argv [])
n++;
}
queue q(in_socket, out_socket);
q.run();
zmq::device (ZMQ_QUEUE, in_socket, out_socket);
return 0;
}