From 9b885b7c006ee2114a83e07d2e84552b11565581 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Mon, 1 Feb 2016 12:33:17 +0000 Subject: [PATCH] Problem: cannot use new pre-allocated FD with IPC Solution: parse the value set by the ZMQ_PRE_ALLOCATED_FD sockopt when creating a new IPC socket and use it if valid. Add new tests/test_pre_allocated_fd_ipc.cpp unit test. --- .gitignore | 1 + Makefile.am | 6 + src/ipc_listener.cpp | 44 +++--- tests/CMakeLists.txt | 1 + tests/test_pre_allocated_fd_ipc.cpp | 213 ++++++++++++++++++++++++++++ 5 files changed, 249 insertions(+), 16 deletions(-) create mode 100644 tests/test_pre_allocated_fd_ipc.cpp diff --git a/.gitignore b/.gitignore index 1a47b83e..5a0f7244 100644 --- a/.gitignore +++ b/.gitignore @@ -125,6 +125,7 @@ test_timers test_radio_dish test_udp test_large_msg +test_pre_allocated_fd_ipc tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/Makefile.am b/Makefile.am index fd86a8dd..833ea848 100644 --- a/Makefile.am +++ b/Makefile.am @@ -617,6 +617,7 @@ test_apps += \ tests/test_shutdown_stress \ tests/test_pair_ipc \ tests/test_reqrep_ipc \ + tests/test_pre_allocated_fd_ipc \ tests/test_timeo \ tests/test_filter_ipc @@ -639,6 +640,11 @@ tests_test_timeo_LDADD = src/libzmq.la tests_test_filter_ipc_SOURCES = tests/test_filter_ipc.cpp tests_test_filter_ipc_LDADD = src/libzmq.la +tests_test_pre_allocated_fd_ipc_SOURCES = \ + tests/test_pre_allocated_fd_ipc.cpp \ + tests/testutil.hpp +tests_test_pre_allocated_fd_ipc_LDADD = src/libzmq.la + if HAVE_FORK test_apps += tests/test_fork diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 6571f194..b4268ac7 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -155,7 +155,12 @@ int zmq::ipc_listener_t::set_address (const char *addr_) // Get rid of the file associated with the UNIX domain socket that // may have been left behind by the previous run of the application. - ::unlink (addr.c_str()); + // MUST NOT unlink if the FD is managed by the user, or it will stop + // working after the first client connects. The user will take care of + // cleaning up the file after the service is stopped. + if (options.pre_allocated_fd == -1) { + ::unlink (addr.c_str()); + } filename.clear (); // Initialise the address structure. @@ -164,26 +169,30 @@ int zmq::ipc_listener_t::set_address (const char *addr_) if (rc != 0) return -1; - // Create a listening socket. - s = open_socket (AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - address.to_string (endpoint); - // Bind the socket to the file path. - rc = bind (s, address.addr (), address.addrlen ()); - if (rc != 0) - goto error; + if (options.pre_allocated_fd != -1) { + s = options.pre_allocated_fd; + } else { + // Create a listening socket. + s = open_socket (AF_UNIX, SOCK_STREAM, 0); + if (s == -1) + return -1; + + // Bind the socket to the file path. + rc = bind (s, address.addr (), address.addrlen ()); + if (rc != 0) + goto error; + + // Listen for incoming connections. + rc = listen (s, options.backlog); + if (rc != 0) + goto error; + } filename.assign (addr.c_str()); has_file = true; - // Listen for incoming connections. - rc = listen (s, options.backlog); - if (rc != 0) - goto error; - socket->event_listening (endpoint, s); return 0; @@ -204,7 +213,10 @@ int zmq::ipc_listener_t::close () // If there's an underlying UNIX domain socket, get rid of the file it // is associated with. - if (has_file && !filename.empty ()) { + // MUST NOT unlink if the FD is managed by the user, or it will stop + // working after the first client connects. The user will take care of + // cleaning up the file after the service is stopped. + if (has_file && !filename.empty () && options.pre_allocated_fd == -1) { rc = ::unlink(filename.c_str ()); if (rc != 0) { socket->event_close_failed (endpoint, zmq_errno()); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 13a4de64..438142ad 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -90,6 +90,7 @@ if(NOT WIN32) test_stream_exceeds_buffer test_router_mandatory_hwm test_term_endpoint_tipc + test_pre_allocated_fd_ipc ) if(HAVE_FORK) list(APPEND tests test_fork) diff --git a/tests/test_pre_allocated_fd_ipc.cpp b/tests/test_pre_allocated_fd_ipc.cpp new file mode 100644 index 00000000..c3358b27 --- /dev/null +++ b/tests/test_pre_allocated_fd_ipc.cpp @@ -0,0 +1,213 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include "testutil.hpp" + +void pre_allocate_sock (void *zmq_socket, const char *path) +{ + struct sockaddr_un addr; + addr.sun_family = AF_UNIX; + strcpy (addr.sun_path, path); + + unlink (path); + + int s_pre = socket (AF_UNIX, SOCK_STREAM, 0); + assert (s_pre != -1); + + int rc = bind (s_pre, (struct sockaddr *) &addr, + sizeof (struct sockaddr_un)); + assert (rc == 0); + + rc = listen (s_pre, SOMAXCONN); + assert (rc == 0); + + rc = zmq_setsockopt (zmq_socket, ZMQ_PRE_ALLOCATED_FD, &s_pre, + sizeof (s_pre)); + assert(rc == 0); +} + +void test_req_rep () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_REP); + assert (sb); + + pre_allocate_sock(sb, "/tmp/tester"); + + int rc = zmq_bind (sb, "ipc:///tmp/tester"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + rc = zmq_connect (sc, "ipc:///tmp/tester"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + rc = unlink ("/tmp/tester"); + assert (rc == 0); +} + +void test_pair () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + + pre_allocate_sock(sb, "/tmp/tester"); + + int rc = zmq_bind (sb, "ipc:///tmp/tester"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + rc = zmq_connect (sc, "ipc:///tmp/tester"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + rc = unlink ("/tmp/tester"); + assert (rc == 0); +} + +void test_client_server () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_SERVER); + assert (sb); + + pre_allocate_sock(sb, "/tmp/tester"); + + int rc = zmq_bind (sb, "ipc:///tmp/tester"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_CLIENT); + assert (sc); + rc = zmq_connect (sc, "ipc:///tmp/tester"); + assert (rc == 0); + + zmq_msg_t msg; + rc = zmq_msg_init_size (&msg, 1); + assert (rc == 0); + + char *data = (char *) zmq_msg_data (&msg); + data [0] = 1; + + rc = zmq_msg_send (&msg, sc, ZMQ_SNDMORE); + assert (rc == -1); + + rc = zmq_msg_send (&msg, sc, 0); + assert (rc == 1); + + rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, sb, 0); + assert (rc == 1); + + uint32_t routing_id = zmq_msg_routing_id (&msg); + assert (routing_id != 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_msg_init_size (&msg, 1); + assert (rc == 0); + + data = (char *)zmq_msg_data (&msg); + data[0] = 2; + + rc = zmq_msg_set_routing_id (&msg, routing_id); + assert (rc == 0); + + rc = zmq_msg_send (&msg, sb, ZMQ_SNDMORE); + assert (rc == -1); + + rc = zmq_msg_send (&msg, sb, 0); + assert (rc == 1); + + rc = zmq_msg_recv (&msg, sc, 0); + assert (rc == 1); + + routing_id = zmq_msg_routing_id (&msg); + assert (routing_id == 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + rc = unlink ("/tmp/tester"); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + + test_req_rep(); + test_pair(); + test_client_server(); + + return 0 ; +}