Merge pull request #2352 from reza-ebrahimi/master

Commit: Issue #2348
This commit is contained in:
Doron Somech 2017-02-24 19:33:13 +02:00 committed by GitHub
commit ca311f7bfe
3 changed files with 38 additions and 7 deletions

View File

@ -212,15 +212,30 @@ int zmq::router_t::xsend (msg_t *msg_)
if (it != outpipes.end ()) { if (it != outpipes.end ()) {
current_out = it->second.pipe; current_out = it->second.pipe;
if (!current_out->check_write ()) {
// Check whether pipe is full or not
if (!current_out->check_hwm()) {
it->second.active = false; it->second.active = false;
current_out = NULL; current_out = NULL;
if (mandatory) { if (mandatory) {
more_out = false; more_out = false;
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
} }
// Check whether pipe is closed or not
else
if (!current_out->check_write()) {
it->second.active = false;
current_out = NULL;
if (mandatory) {
more_out = false;
errno = EHOSTUNREACH;
return -1;
}
}
} }
else else
if (mandatory) { if (mandatory) {

View File

@ -1120,6 +1120,12 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (rc == 0) { if (rc == 0) {
return 0; return 0;
} }
// In case of ZMQ_ROUTER_MANDATORY option set and peer disconnected
if (likely(errno == EHOSTUNREACH)) {
return -1;
}
if (unlikely(errno != EAGAIN)) { if (unlikely(errno != EAGAIN)) {
return -1; return -1;
} }

View File

@ -84,11 +84,15 @@ int main (void)
const int BUF_SIZE = 65536; const int BUF_SIZE = 65536;
char buf[BUF_SIZE]; char buf[BUF_SIZE];
memset(buf, 0, BUF_SIZE); memset(buf, 0, BUF_SIZE);
// Send first batch of messages // Send first batch of messages
for(i = 0; i < 100000; ++i) { for(i = 0; i < 100000; ++i) {
if (TRACE_ENABLED) fprintf(stderr, "Sending message %d ...\n", i); if (TRACE_ENABLED)
fprintf(stderr, "Sending message %d ...\n", i);
rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE); rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (rc == -1 && zmq_errno() == EAGAIN) break; if (rc == -1 && zmq_errno() == EHOSTUNREACH)
break;
assert (rc == 1); assert (rc == 1);
rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT); rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT);
assert (rc == BUF_SIZE); assert (rc == BUF_SIZE);
@ -96,12 +100,17 @@ int main (void)
// This should fail after one message but kernel buffering could // This should fail after one message but kernel buffering could
// skew results // skew results
assert (i < 10); assert (i < 10);
msleep (1000); msleep (1000);
// Send second batch of messages // Send second batch of messages
for(; i < 100000; ++i) { for(; i < 100000; ++i) {
if (TRACE_ENABLED) fprintf(stderr, "Sending message %d (part 2) ...\n", i); if (TRACE_ENABLED)
fprintf(stderr, "Sending message %d (part 2) ...\n", i);
rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE); rc = zmq_send (router, "X", 1, ZMQ_DONTWAIT | ZMQ_SNDMORE);
if (rc == -1 && zmq_errno() == EAGAIN) break; if (rc == -1 && zmq_errno() == EHOSTUNREACH)
break;
assert (rc == 1); assert (rc == 1);
rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT); rc = zmq_send (router, buf, BUF_SIZE, ZMQ_DONTWAIT);
assert (rc == BUF_SIZE); assert (rc == BUF_SIZE);
@ -110,7 +119,8 @@ int main (void)
// skew results // skew results
assert (i < 20); assert (i < 20);
if (TRACE_ENABLED) fprintf(stderr, "Done sending messages.\n"); if (TRACE_ENABLED)
fprintf(stderr, "Done sending messages.\n");
rc = zmq_close (router); rc = zmq_close (router);
assert (rc == 0); assert (rc == 0);