After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
/*
|
|
|
|
Copyright (c) 2012 Ian Barber
|
|
|
|
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
|
|
|
|
|
|
|
|
This file is part of 0MQ.
|
|
|
|
|
|
|
|
0MQ is free software; you can redistribute it and/or modify it under
|
|
|
|
the terms of the GNU Lesser General Public License as published by
|
|
|
|
the Free Software Foundation; either version 3 of the License, or
|
|
|
|
(at your option) any later version.
|
|
|
|
|
|
|
|
0MQ is distributed in the hope that it will be useful,
|
|
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
GNU Lesser General Public License for more details.
|
|
|
|
|
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
|
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
#include <cstdlib>
|
|
|
|
#include <cstring>
|
|
|
|
#include <iostream>
|
|
|
|
#include <errno.h>
|
|
|
|
|
|
|
|
#include "../include/zmq.h"
|
|
|
|
|
2012-06-11 21:04:09 +02:00
|
|
|
static void *server (void *c)
|
|
|
|
{
|
|
|
|
void *socket, *context;
|
|
|
|
char buffer[16];
|
|
|
|
int rc, val;
|
|
|
|
|
2012-06-12 15:07:54 +02:00
|
|
|
shoulddie = *(long *)sd;
|
|
|
|
|
2012-06-11 21:04:09 +02:00
|
|
|
context = zmq_init (1);
|
|
|
|
assert (context);
|
|
|
|
|
|
|
|
socket = zmq_socket (context, ZMQ_PULL);
|
|
|
|
assert (socket);
|
|
|
|
|
|
|
|
val = 0;
|
|
|
|
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = zmq_bind (socket, "ipc:///tmp/recon");
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
memset (&buffer, 0, sizeof(buffer));
|
|
|
|
rc = zmq_recv (socket, &buffer, sizeof(buffer), 0);
|
|
|
|
|
|
|
|
// Intentionally bail out
|
|
|
|
rc = zmq_close (socket);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = zmq_term (context);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
usleep (200000);
|
|
|
|
|
|
|
|
context = zmq_init (1);
|
|
|
|
assert (context);
|
|
|
|
|
|
|
|
socket = zmq_socket (context, ZMQ_PULL);
|
|
|
|
assert (socket);
|
|
|
|
|
|
|
|
val = 0;
|
|
|
|
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = zmq_bind (socket, "ipc:///tmp/recon");
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
usleep (200000);
|
|
|
|
|
|
|
|
memset (&buffer, 0, sizeof(buffer));
|
|
|
|
rc = zmq_recv (socket, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
|
|
|
|
assert (rc != -1);
|
|
|
|
|
|
|
|
// Start closing the socket while the connecting process is underway.
|
|
|
|
rc = zmq_close (socket);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = zmq_term (context);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
pthread_exit(NULL);
|
|
|
|
}
|
|
|
|
|
|
|
|
static void *worker (void *n)
|
|
|
|
{
|
|
|
|
void *socket, *context;
|
|
|
|
int rc, hadone, val;
|
|
|
|
|
|
|
|
context = zmq_init (1);
|
|
|
|
assert (context);
|
|
|
|
|
|
|
|
socket = zmq_socket (context, ZMQ_PUSH);
|
|
|
|
assert (socket);
|
|
|
|
|
|
|
|
val = 0;
|
|
|
|
rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
val = 1;
|
|
|
|
rc = zmq_setsockopt (socket, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = zmq_connect (socket, "ipc:///tmp/recon");
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
hadone = 0;
|
|
|
|
// Not checking RC as some may be -1
|
|
|
|
for (int i = 0; i < 4; i++) {
|
|
|
|
usleep(200000);
|
|
|
|
rc = zmq_send (socket, "hi", 2, ZMQ_DONTWAIT);
|
|
|
|
if (rc != -1)
|
|
|
|
hadone ++;
|
|
|
|
}
|
|
|
|
|
|
|
|
assert (hadone >= 2);
|
|
|
|
assert (hadone < 4);
|
|
|
|
|
|
|
|
rc = zmq_close (socket);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = zmq_term (context);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
pthread_exit(NULL);
|
|
|
|
}
|
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
int main (int argc, char *argv [])
|
|
|
|
{
|
|
|
|
fprintf (stderr, "test_connect_delay running...\n");
|
|
|
|
int val;
|
|
|
|
int rc;
|
|
|
|
char buffer[16];
|
|
|
|
int seen = 0;
|
|
|
|
|
|
|
|
void *context = zmq_ctx_new();
|
2012-06-03 23:57:47 +02:00
|
|
|
assert (context);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
void *to = zmq_socket(context, ZMQ_PULL);
|
2012-06-03 23:57:47 +02:00
|
|
|
assert (to);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
val = 0;
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
|
|
|
rc = zmq_bind(to, "tcp://*:5555");
|
|
|
|
assert (rc == 0);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
|
|
|
|
// Create a socket pushing to two endpoints - only 1 message should arrive.
|
|
|
|
void *from = zmq_socket (context, ZMQ_PUSH);
|
2012-06-03 23:57:47 +02:00
|
|
|
assert(from);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
val = 0;
|
2012-06-03 23:57:47 +02:00
|
|
|
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
rc = zmq_connect (from, "tcp://localhost:5556");
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
assert (rc == 0);
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_connect (from, "tcp://localhost:5555");
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; ++i)
|
|
|
|
{
|
|
|
|
std::string message("message ");
|
|
|
|
message += ('0' + i);
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_send (from, message.data(), message.size(), 0);
|
|
|
|
assert(rc >= 0);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
}
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
sleep(1);
|
|
|
|
seen = 0;
|
|
|
|
for (int i = 0; i < 10; ++i)
|
|
|
|
{
|
2012-06-11 21:04:09 +02:00
|
|
|
memset (&buffer, 0, sizeof(buffer));
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
if( rc == -1)
|
|
|
|
break;
|
|
|
|
seen++;
|
|
|
|
}
|
|
|
|
assert (seen == 5);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
rc = zmq_close (from);
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
rc = zmq_close (to);
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
rc = zmq_ctx_destroy(context);
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
context = zmq_ctx_new();
|
2012-06-04 00:11:08 +02:00
|
|
|
fprintf (stderr, " Rerunning with DELAY_ATTACH_ON_CONNECT\n");
|
|
|
|
|
2012-06-03 23:57:47 +02:00
|
|
|
to = zmq_socket (context, ZMQ_PULL);
|
|
|
|
assert (to);
|
|
|
|
rc = zmq_bind (to, "tcp://*:5560");
|
2012-06-04 00:11:08 +02:00
|
|
|
assert (rc == 0);
|
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
val = 0;
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
// Create a socket pushing to two endpoints - all messages should arrive.
|
|
|
|
from = zmq_socket (context, ZMQ_PUSH);
|
2012-06-03 23:57:47 +02:00
|
|
|
assert (from);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
val = 0;
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
val = 1;
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_connect (from, "tcp://localhost:5561");
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_connect (from, "tcp://localhost:5560");
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
for (int i = 0; i < 10; ++i)
|
|
|
|
{
|
|
|
|
std::string message("message ");
|
|
|
|
message += ('0' + i);
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_send (from, message.data(), message.size(), 0);
|
|
|
|
assert (rc >= 0);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
}
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
sleep(1);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
seen = 0;
|
|
|
|
for (int i = 0; i < 10; ++i)
|
|
|
|
{
|
|
|
|
memset(&buffer, 0, sizeof(buffer));
|
2012-06-03 23:57:47 +02:00
|
|
|
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
|
|
|
|
assert (rc != -1);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
rc = zmq_close (from);
|
|
|
|
assert (rc == 0);
|
2012-06-04 00:11:08 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
rc = zmq_close (to);
|
|
|
|
assert (rc == 0);
|
2012-06-11 21:04:09 +02:00
|
|
|
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
rc = zmq_ctx_destroy(context);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
2012-06-11 21:04:09 +02:00
|
|
|
fprintf (stderr, " Running DELAY_ATTACH_ON_CONNECT with disconnect\n");
|
|
|
|
|
|
|
|
pthread_t serv, work;
|
|
|
|
|
|
|
|
rc = pthread_create (&serv, NULL, server, NULL);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
rc = pthread_create (&work, NULL, worker, NULL);
|
|
|
|
assert (rc == 0);
|
|
|
|
|
|
|
|
pthread_exit(NULL);
|
After speaking with Ben Gray and the discussion on the mailing list, this is an attempt to create a sockopt to allow connecting pipes to not immediately be available for traffic. The problem is in a PUSH to many PULL situation, where there is a connect to a PULL which is not there. This connect will immediately create a pipe (unlike bind), and traffic will be load balanced to that pipe. This means if there is a persistently unavailable end point then the traffic will queue until HWM is hit, and older messages will be lost.
This patch adds a sockopt ZMQ_DELAY_ATTACH_ON_CONNECT, which if set to 1 will attempt to preempt this behavior. It does this by extending the use of the session_base to include in the outbound as well as the inbound pipe, and only associates the pipe with the socket once it receives the connected callback via a process_attach message. This works, and a test has been added to show so, but may introduce unexpected complications. The shutdown logic in this class has become marginally more awkward because of this, requiring the session to serve as the sink for both pipes if shutdown occurs with a still-connecting pipe in place. It is also possible there could be issues around flushing the messages, but as I could not directly think how to create such an issue I have not written any code with regards to that.
The documentation has been updated to reflect the change, but please do check over the code and test and review.
2012-06-01 18:58:19 +02:00
|
|
|
}
|