diff --git a/AUTHORS b/AUTHORS index 4cceee62..169e9e0d 100644 --- a/AUTHORS +++ b/AUTHORS @@ -98,6 +98,7 @@ Trevor Bernard Vitaly Mayatskikh Lourens Naudé Hardeep Singh +André Caron Credits ======= diff --git a/CMakeLists.txt b/CMakeLists.txt index 8b9fe651..b4becc58 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -604,6 +604,7 @@ set(tests test_probe_router test_stream test_stream_empty + test_stream_disconnect_notifications test_disconnect_inproc test_ctx_options test_ctx_destroy diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index bddd8a2f..6fb86ae2 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -352,7 +352,9 @@ To open a connection to a server, use the zmq_connect call, and then fetch the socket identity using the ZMQ_IDENTITY zmq_getsockopt call. To close a specific client connection, as a server, send the identity frame -followed by a zero-length message (see EXAMPLE section). +followed by a zero-length message (see EXAMPLE section). Similarly, when the +peer disconnects (or the connection is lost), a zero-length message will be +received by the application. The ZMQ_SNDMORE flag is ignored on data frames. You must send one identity frame followed by one data frame. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index c4880564..ab437dc1 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -780,6 +780,14 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) void zmq::stream_engine_t::error () { + if (options.raw_sock) { + // For raw sockets, send a final 0-length message to the application + // so that it knows the peer has been disconnected. + msg_t terminator; + terminator.init(); + (this->*write_msg) (&terminator); + terminator.close(); + } zmq_assert (session); socket->event_disconnected (endpoint, s); session->flush (); diff --git a/tests/Makefile.am b/tests/Makefile.am index 7238c5da..a437b2f9 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -25,6 +25,7 @@ noinst_PROGRAMS = test_system \ test_probe_router \ test_stream \ test_stream_empty \ + test_stream_disconnect_notifications \ test_disconnect_inproc \ test_ctx_options \ test_ctx_destroy \ @@ -90,6 +91,7 @@ test_router_handover_SOURCES = test_router_handover.cpp test_probe_router_SOURCES = test_probe_router.cpp test_stream_SOURCES = test_stream.cpp test_stream_empty_SOURCES = test_stream_empty.cpp +test_stream_disconnect_notifications_SOURCES = test_stream_disconnect_notifications.cpp test_disconnect_inproc_SOURCES = test_disconnect_inproc.cpp test_ctx_options_SOURCES = test_ctx_options.cpp test_iov_SOURCES = test_iov.cpp diff --git a/tests/test_stream_disconnect_notifications.cpp b/tests/test_stream_disconnect_notifications.cpp new file mode 100644 index 00000000..c2a868d1 --- /dev/null +++ b/tests/test_stream_disconnect_notifications.cpp @@ -0,0 +1,200 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +static const int SERVER = 0; +static const int CLIENT = 1; + +struct test_message_t { + int turn; + const char * text; +}; + +// NOTE: messages are sent without null terminator. +const test_message_t dialog [] = { + {CLIENT, "i can haz cheez burger?"}, + {SERVER, "y u no disonnect?"}, + {CLIENT, ""}, +}; +const int steps = sizeof(dialog) / sizeof(dialog[0]); + +bool has_more (void* socket) +{ + int more = 0; + size_t more_size = sizeof(more); + int rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); + if (rc != 0) { + return false; + } + return more != 0; +} + +bool get_identity (void* socket, char* data, size_t* size) +{ + int rc = zmq_getsockopt (socket, ZMQ_IDENTITY, data, size); + return rc == 0; +} + +int main(int argc, char** argv) +{ + setup_test_environment(); + + void* context = zmq_ctx_new (); + void* sockets [2]; + int rc = 0; + + sockets [SERVER] = zmq_socket (context, ZMQ_STREAM); + rc = zmq_bind (sockets [SERVER], "tcp://0.0.0.0:6666"); + assert (rc == 0); + + sockets [CLIENT] = zmq_socket (context, ZMQ_STREAM); + rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666"); + assert (rc == 0); + + // TODO: wait for client to become ready. + + // Send initial message. + char blob_data [256]; + size_t blob_size = sizeof(blob_data); + rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size); + assert (rc == 0); + zmq_msg_t msg; + zmq_msg_init_size (&msg, blob_size); + memcpy (zmq_msg_data (&msg), blob_data, blob_size); + zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE); + zmq_msg_close (&msg); + zmq_msg_init_size (&msg, strlen(dialog [0].text)+1); + memcpy (zmq_msg_data (&msg), dialog [0].text, strlen(dialog [0].text)+1); + zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE); + zmq_msg_close (&msg); + + // TODO: make sure this loop doesn't loop forever if something is wrong + // with the test (or the implementation). + + int step = 0; + while (step < steps) { + // Wait until something happens. + zmq_pollitem_t items [] = { + { sockets [SERVER], 0, ZMQ_POLLIN, 0 }, + { sockets [CLIENT], 0, ZMQ_POLLIN, 0 }, + }; + int rc = zmq_poll (items, 2, 100); + assert (rc >= 0); + + printf ("Event received for step %d.\n", step); + + // Check for data received by the server. + if (items [SERVER].revents & ZMQ_POLLIN) { + assert (dialog [step].turn == CLIENT); + + // Grab the 1st frame (peer identity). + zmq_msg_t peer_frame; + zmq_msg_init (&peer_frame); + zmq_msg_recv (&peer_frame, sockets [SERVER], 0); + assert (has_more (sockets [SERVER])); + + // Grab the 2nd frame (actual payload). + zmq_msg_t data_frame; + zmq_msg_init (&data_frame); + zmq_msg_recv (&data_frame, sockets [SERVER], 0); + + // Make sure payload matches what we expect. + const char * const data = (const char*)zmq_msg_data (&data_frame); + const int size = zmq_msg_size (&data_frame); + int cmp = memcmp(dialog [step].text, data, size); + assert (cmp == 0); + + ++step; + + // 0-length frame is a disconnection notification. The server + // should receive it as the last step in the dialogue. + if (size == 0) { + printf ("server received disconnection notification!\n"); + assert (step == steps); + } + else { + printf ("server received %d bytes.\n", size); + assert (step < steps); + + // Prepare the response. + zmq_msg_close (&data_frame); + zmq_msg_init_size (&data_frame, strlen (dialog [step].text)); + memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame)); + + // Send the response. + printf ("server sending %d bytes.\n", (int)zmq_msg_size (&data_frame)); + zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE); + zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE); + } + + // Release resources. + zmq_msg_close (&peer_frame); + zmq_msg_close (&data_frame); + } + + // Check for data received by the client. + if (items [CLIENT].revents & ZMQ_POLLIN) { + assert (dialog [step].turn == SERVER); + + // Grab the 1st frame (peer identity). + zmq_msg_t peer_frame; + zmq_msg_init (&peer_frame); + zmq_msg_recv (&peer_frame, sockets [CLIENT], 0); + assert (has_more (sockets [CLIENT])); + + // Grab the 2nd frame (actual payload). + zmq_msg_t data_frame; + zmq_msg_init (&data_frame); + zmq_msg_recv (&data_frame, sockets [CLIENT], 0); + + // Make sure payload matches what we expect. + const char * const data = (const char*)zmq_msg_data (&data_frame); + const int size = zmq_msg_size (&data_frame); + int cmp = memcmp(dialog [step].text, data, size); + assert (cmp == 0); + + printf ("client received %d bytes.\n", size); + + ++step; + + // Prepare the response (next line in the dialog). + assert (step < steps); + zmq_msg_close (&data_frame); + zmq_msg_init_size (&data_frame, strlen (dialog [step].text)); + memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame)); + + // Send the response. + printf ("client sending %d bytes.\n", (int)zmq_msg_size (&data_frame)); + zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE); + zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE); + + // Release resources. + zmq_msg_close (&peer_frame); + zmq_msg_close (&data_frame); + } + } + assert (step == steps); + + printf ("Done, exiting now.\n"); + zmq_close (sockets [CLIENT]); + zmq_close (sockets [SERVER]); + zmq_ctx_term (context); + return 0; +}