mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-17 04:50:57 +08:00
Merge pull request #583 from ianbarber/master
Small stream engine issue and test_monitor tidy up
This commit is contained in:
commit
9eb2521537
@ -660,6 +660,12 @@ void zmq::stream_engine_t::mechanism_ready ()
|
|||||||
msg_t identity;
|
msg_t identity;
|
||||||
mechanism->peer_identity (&identity);
|
mechanism->peer_identity (&identity);
|
||||||
const int rc = session->push_msg (&identity);
|
const int rc = session->push_msg (&identity);
|
||||||
|
if (rc == -1 && errno == EAGAIN) {
|
||||||
|
// If the write is failing at this stage with
|
||||||
|
// an EAGAIN the pipe must be being shut down,
|
||||||
|
// so we can just bail out of the identity set.
|
||||||
|
return;
|
||||||
|
}
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,12 +41,14 @@ static bool read_msg(void* s, zmq_event_t& event, std::string& ep)
|
|||||||
zmq_msg_init (&msg2);
|
zmq_msg_init (&msg2);
|
||||||
rc = zmq_msg_recv (&msg1, s, 0);
|
rc = zmq_msg_recv (&msg1, s, 0);
|
||||||
if (rc == -1 && zmq_errno() == ETERM)
|
if (rc == -1 && zmq_errno() == ETERM)
|
||||||
return true ;
|
return true ;
|
||||||
|
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
assert (zmq_msg_more(&msg1) != 0);
|
assert (zmq_msg_more(&msg1) != 0);
|
||||||
rc = zmq_msg_recv (&msg2, s, 0);
|
rc = zmq_msg_recv (&msg2, s, 0);
|
||||||
if (rc == -1 && zmq_errno() == ETERM)
|
if (rc == -1 && zmq_errno() == ETERM)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
assert (zmq_msg_more(&msg2) == 0);
|
assert (zmq_msg_more(&msg2) == 0);
|
||||||
// copy binary data to event struct
|
// copy binary data to event struct
|
||||||
@ -72,7 +74,7 @@ static void *req_socket_monitor (void *ctx)
|
|||||||
rc = zmq_connect (s, "inproc://monitor.req");
|
rc = zmq_connect (s, "inproc://monitor.req");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
while (!read_msg(s, event, ep)) {
|
while (!read_msg(s, event, ep)) {
|
||||||
assert (ep == addr);
|
assert (ep == addr);
|
||||||
switch (event.event) {
|
switch (event.event) {
|
||||||
case ZMQ_EVENT_CONNECTED:
|
case ZMQ_EVENT_CONNECTED:
|
||||||
assert (event.value > 0);
|
assert (event.value > 0);
|
||||||
@ -114,7 +116,7 @@ static void *req2_socket_monitor (void *ctx)
|
|||||||
rc = zmq_connect (s, "inproc://monitor.req2");
|
rc = zmq_connect (s, "inproc://monitor.req2");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
while (!read_msg(s, event, ep)) {
|
while (!read_msg(s, event, ep)) {
|
||||||
assert (ep == addr);
|
assert (ep == addr);
|
||||||
switch (event.event) {
|
switch (event.event) {
|
||||||
case ZMQ_EVENT_CONNECTED:
|
case ZMQ_EVENT_CONNECTED:
|
||||||
assert (event.value > 0);
|
assert (event.value > 0);
|
||||||
@ -143,7 +145,7 @@ static void *rep_socket_monitor (void *ctx)
|
|||||||
rc = zmq_connect (s, "inproc://monitor.rep");
|
rc = zmq_connect (s, "inproc://monitor.rep");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
while (!read_msg(s, event, ep)) {
|
while (!read_msg(s, event, ep)) {
|
||||||
assert (ep == addr);
|
assert (ep == addr);
|
||||||
switch (event.event) {
|
switch (event.event) {
|
||||||
case ZMQ_EVENT_LISTENING:
|
case ZMQ_EVENT_LISTENING:
|
||||||
assert (event.value > 0);
|
assert (event.value > 0);
|
||||||
@ -203,10 +205,7 @@ int main (void)
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
|
rc = pthread_create (&threads [0], NULL, rep_socket_monitor, ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_bind (rep, addr.c_str());
|
|
||||||
assert (rc == 0);
|
|
||||||
|
|
||||||
// REQ socket
|
// REQ socket
|
||||||
req = zmq_socket (ctx, ZMQ_REQ);
|
req = zmq_socket (ctx, ZMQ_REQ);
|
||||||
assert (req);
|
assert (req);
|
||||||
@ -216,6 +215,11 @@ int main (void)
|
|||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx);
|
rc = pthread_create (&threads [1], NULL, req_socket_monitor, ctx);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
|
// Bind REQ and REP
|
||||||
|
rc = zmq_bind (rep, addr.c_str());
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
rc = zmq_connect (req, addr.c_str());
|
rc = zmq_connect (req, addr.c_str());
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
@ -271,4 +275,3 @@ int main (void)
|
|||||||
|
|
||||||
return 0 ;
|
return 0 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user