Merge pull request #121 from minrk/pollout

fix scope of itemsout poll
This commit is contained in:
Pieter Hintjens 2015-06-05 23:23:18 +02:00
commit d4994cfe1c
2 changed files with 25 additions and 6 deletions

View File

@ -93,18 +93,18 @@ int zmq::proxy (
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
// Get the pollout separately because when combining this with pollin it maxes the CPU // Get the pollout separately because when combining this with pollin it maxes the CPU
// because pollout shall most of the time return directly // because pollout shall most of the time return directly
rc = zmq_poll (&itemsout [0], 2, 0); rc = zmq_poll (&itemsout [0], 2, 0);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return -1; return -1;
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more; moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more) if (unlikely (rc < 0) || more)

View File

@ -86,22 +86,41 @@ int main (void)
rc = zmq_connect (publisher, "tcp://127.0.0.1:15564"); rc = zmq_connect (publisher, "tcp://127.0.0.1:15564");
assert (rc == 0); assert (rc == 0);
// Start a secondary puller which reads the data out the other end
char buf[255];
void *puller = zmq_socket (ctx, ZMQ_PULL);
assert (puller);
rc = zmq_connect (puller, "tcp://127.0.0.1:15563");
assert (rc == 0);
msleep (50); msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0); rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14); assert (rc == 14);
rc = zmq_recv (puller, buf, 255, 0);
assert (rc == 14);
msleep (50); msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0); rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14); assert (rc == 14);
rc = zmq_recv (puller, buf, 255, 0);
assert (rc == 14);
msleep (50); msleep (50);
rc = zmq_send (publisher, "This is a test", 14, 0); rc = zmq_send (publisher, "This is a test", 14, 0);
assert (rc == 14); assert (rc == 14);
rc = zmq_recv (puller, buf, 255, 0);
assert (rc == 14);
rc = zmq_send (control, "TERMINATE", 9, 0); rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9); assert (rc == 9);
rc = zmq_close (publisher); rc = zmq_close (publisher);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (puller);
assert (rc == 0);
rc = zmq_close (control); rc = zmq_close (control);
assert (rc == 0); assert (rc == 0);