From 70bc7dd925200799dd284f7224363f5ea6c8f4c8 Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Sun, 9 Feb 2020 22:04:56 +0200 Subject: [PATCH] problem: zeromq doesn't has a thread-safe peer to peer socket Solution: a new socket type, called PEER. Very similar to SERVER, but can only connect to other PEERs. Also a new zmq_connect_peer method, that connect and return a routing-id in thread-safe and atomic operation --- .gitignore | 4 +- CMakeLists.txt | 2 + Makefile.am | 9 +++- doc/Makefile.am | 2 +- doc/zmq_connect_peer.txt | 92 +++++++++++++++++++++++++++++++ doc/zmq_socket.txt | 43 +++++++++++++++ include/zmq.h | 8 +-- src/mechanism.cpp | 5 +- src/peer.cpp | 68 +++++++++++++++++++++++ src/peer.hpp | 67 +++++++++++++++++++++++ src/server.hpp | 2 +- src/session_base.cpp | 1 + src/socket_base.cpp | 10 +++- src/socket_base.hpp | 8 +-- src/zmq.cpp | 23 ++++++++ src/zmq_draft.h | 1 + tests/CMakeLists.txt | 1 + tests/test_peer.cpp | 113 +++++++++++++++++++++++++++++++++++++++ 18 files changed, 447 insertions(+), 12 deletions(-) create mode 100644 doc/zmq_connect_peer.txt create mode 100644 src/peer.cpp create mode 100644 src/peer.hpp create mode 100644 tests/test_peer.cpp diff --git a/.gitignore b/.gitignore index c0d0d035..cfc3278c 100644 --- a/.gitignore +++ b/.gitignore @@ -156,12 +156,14 @@ test_ws_transport test_wss_transport test_socks test_xpub_manual_last_value +test_peer unittest_ip_resolver unittest_mtrie unittest_poller unittest_radix_tree unittest_udp_address unittest_ypipe +unittests/unittest_curve_encoding tests/test*.log tests/test*.trs unittests/unittest*.log @@ -205,4 +207,4 @@ core build test-suite.log .idea/ -cmake-build-debug/ \ No newline at end of file +cmake-build-debug/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ba77858..4d31f9af 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -822,6 +822,7 @@ set(cxx-sources own.cpp null_mechanism.cpp pair.cpp + peer.cpp pgm_receiver.cpp pgm_sender.cpp pgm_socket.cpp @@ -947,6 +948,7 @@ set(cxx-sources options.hpp own.hpp pair.hpp + peer.hpp pgm_receiver.hpp pgm_sender.hpp pgm_socket.hpp diff --git a/Makefile.am b/Makefile.am index 746545cc..c317245b 100644 --- a/Makefile.am +++ b/Makefile.am @@ -126,6 +126,8 @@ src_libzmq_la_SOURCES = \ src/own.hpp \ src/pair.cpp \ src/pair.hpp \ + src/peer.cpp \ + src/peer.hpp \ src/pgm_receiver.cpp \ src/pgm_receiver.hpp \ src/pgm_sender.cpp \ @@ -1034,7 +1036,8 @@ test_apps += tests/test_poller \ tests/test_dgram \ tests/test_app_meta \ tests/test_xpub_manual_last_value \ - tests/test_router_notify + tests/test_router_notify \ + tests/test_peer tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1075,6 +1078,10 @@ tests_test_app_meta_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_router_notify_SOURCES = tests/test_router_notify.cpp tests_test_router_notify_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_router_notify_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_peer_SOURCES = tests/test_peer.cpp +tests_test_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS} endif if ENABLE_STATIC diff --git a/doc/Makefile.am b/doc/Makefile.am index ba63d705..1ff9caa1 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,7 +1,7 @@ # # documentation # -MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ +MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3 zmq_close.3 \ zmq_ctx_new.3 zmq_ctx_term.3 zmq_ctx_get.3 zmq_ctx_set.3 zmq_ctx_shutdown.3 \ zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \ zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \ diff --git a/doc/zmq_connect_peer.txt b/doc/zmq_connect_peer.txt new file mode 100644 index 00000000..5375d1ba --- /dev/null +++ b/doc/zmq_connect_peer.txt @@ -0,0 +1,92 @@ +zmq_connect_peer(3) +=================== + + +NAME +---- +zmq_connect_peer - create outgoing connection from socket and return the connection routing id in thread-safe and atomic way. + + +SYNOPSIS +-------- +*uint32_t zmq_connect_peer (void '*socket', const char '*endpoint');* + + +DESCRIPTION +----------- +The _zmq_connect_peer()_ function connects a 'ZMQ_PEER' socket to an 'endpoint' and then returns the endpoint 'routing_id'. + +The 'endpoint' is a string consisting of a 'transport'`://` followed by an +'address'. The 'transport' specifies the underlying protocol to use. The +'address' specifies the transport-specific address to connect to. + +The function is supported only on the 'ZMQ_PEER' socket type and would return `0` with 'errno' set to 'ENOTSUP' otherwise. + +The _zmq_connect_peer()_ support the following transports: + +'tcp':: unicast transport using TCP, see linkzmq:zmq_tcp[7] +'ipc':: local inter-process communication transport, see linkzmq:zmq_ipc[7] +'inproc':: local in-process (inter-thread) communication transport, see linkzmq:zmq_inproc[7] +'ws':: unicast transport using WebSockets, see linkzmq:zmq_ws[7] +'wss':: unicast transport using WebSockets over TLS, see linkzmq:zmq_wss[7] + +RETURN VALUE +------------ +The _zmq_connect_peer()_ function returns the peer 'routing_id' if successful. Otherwise it returns +`0` and sets 'errno' to one of the values defined below. + + +ERRORS +------ +*EINVAL*:: +The endpoint supplied is invalid. +*EPROTONOSUPPORT*:: +The requested 'transport' protocol is not supported with 'ZMQ_PEER'. +*ENOCOMPATPROTO*:: +The requested 'transport' protocol is not compatible with the socket type. +*ETERM*:: +The 0MQ 'context' associated with the specified 'socket' was terminated. +*ENOTSOCK*:: +The provided 'socket' was invalid. +*EMTHREAD*:: +No I/O thread is available to accomplish the task. +*ENOTSUP*:: +The socket is not of type 'ZMQ_PEER'. +*EFAULT*:: +The 'ZMQ_IMMEDIATE' option is set on the socket. + +EXAMPLE +------- +.Connecting a peer socket to a TCP transport and sending a message +---- +/* Create a ZMQ_SUB socket */ +void *socket = zmq_socket (context, ZMQ_PEER); +assert (socket); +/* Connect it to the host server001, port 5555 using a TCP transport */ +uint32_t routing_id = zmq_connect (socket, "tcp://server001:5555"); +assert (routing_id == 0); +/* Sending a message to the peer */ +zmq_msg_t msg; +int rc = zmq_msg_init_data (&msg, "HELLO", 5, NULL, NULL); +assert (rc == 0); +rc = zmq_msg_set_routing_id (&msg, routing_id); +assert (rc == 0); +rc = zmq_msg_send (&msg, socket, 0); +assert (rc == 5); +rc = zmq_msg_close (&msg); +assert (rc == 0); +---- + + +SEE ALSO +-------- +linkzmq:zmq_connect[3] +linkzmq:zmq_bind[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index a0ff7c7c..af247e92 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -59,6 +59,7 @@ Following are the thread safe sockets: * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER +* ZMQ_PEER .Socket types The following sections present the socket types defined by 0MQ, grouped by the @@ -434,6 +435,48 @@ Outgoing routing strategy:: N/A Action in mute state:: Block +Peer-to-peer pattern +~~~~~~~~~~~~~~~~~~~~~~ + +The peer-to-peer pattern is used to connect a peer to multiple peers. +Peer can both connect and bind and mix both of them with the same socket. +The peer-to-peer pattern is useful to build peer-to-peer networks (e.g zyre, bitcoin, torrent) +where a peer can both accept connections from other peers or connect to them. + +NOTE: Peer-to-peer is still in draft phase. + +ZMQ_PEER +^^^^^^^^ +A 'ZMQ_PEER' socket talks to a set of 'ZMQ_PEER' sockets. + +To connect and fetch the 'routing_id' of the peer use linkzmq:zmq_connect_peer[3]. + +Each received message has a 'routing_id' that is a 32-bit unsigned integer. +The application can fetch this with linkzmq:zmq_msg_routing_id[3]. + +To send a message to a given 'ZMQ_PEER' peer the application must set the peer's +'routing_id' on the message, using linkzmq:zmq_msg_set_routing_id[3]. + +If the 'routing_id' is not specified, or does not refer to a connected client +peer, the send call will fail with EHOSTUNREACH. If the outgoing buffer for +the peer is full, the send call shall block, unless ZMQ_DONTWAIT is +used in the send, in which case it shall fail with EAGAIN. The 'ZMQ_PEER' +socket shall not drop messages in any case. + +NOTE: 'ZMQ_PEER' sockets are threadsafe. They do not accept the ZMQ_SNDMORE +option on sends not ZMQ_RCVMORE on receives. This limits them to single part +data. + +[horizontal] +.Summary of ZMQ_PEER characteristics +Compatible peer sockets:: 'ZMQ_PEER' +Direction:: Bidirectional +Send/receive pattern:: Unrestricted +Outgoing routing strategy:: See text +Incoming routing strategy:: Fair-queued +Action in mute state:: Return EAGAIN + + Native Pattern ~~~~~~~~~~~~~~ The native pattern is used for communicating with TCP peers and allows diff --git a/include/zmq.h b/include/zmq.h index c3c2d0c8..0759598a 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -263,7 +263,7 @@ typedef struct zmq_msg_t #endif } zmq_msg_t; -typedef void(zmq_free_fn) (void *data_, void *hint_); +typedef void (zmq_free_fn) (void *data_, void *hint_); ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_); ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_); @@ -597,7 +597,7 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p_); #define ZMQ_HAVE_TIMERS -typedef void(zmq_timer_fn) (int timer_id, void *arg); +typedef void (zmq_timer_fn) (int timer_id, void *arg); ZMQ_EXPORT void *zmq_timers_new (void); ZMQ_EXPORT int zmq_timers_destroy (void **timers_p); @@ -634,7 +634,7 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); /* Sleeps for specified number of seconds. */ ZMQ_EXPORT void zmq_sleep (int seconds_); -typedef void(zmq_thread_fn) (void *); +typedef void (zmq_thread_fn) (void *); /* Start a thread. Returns a handle to the thread. */ ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn *func_, void *arg_); @@ -658,6 +658,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_GATHER 16 #define ZMQ_SCATTER 17 #define ZMQ_DGRAM 18 +#define ZMQ_PEER 19 /* DRAFT Socket options. */ #define ZMQ_ZAP_ENFORCE_DOMAIN 93 @@ -694,6 +695,7 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_, /* DRAFT Socket methods. */ ZMQ_EXPORT int zmq_join (void *s, const char *group); ZMQ_EXPORT int zmq_leave (void *s, const char *group); +ZMQ_EXPORT uint32_t zmq_connect_peer (void *s_, const char *addr_); /* DRAFT Msg methods. */ ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id); diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 93945a33..004e388c 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -93,6 +93,7 @@ const char socket_type_dish[] = "DISH"; const char socket_type_gather[] = "GATHER"; const char socket_type_scatter[] = "SCATTER"; const char socket_type_dgram[] = "DGRAM"; +const char socket_type_peer[] = "PEER"; #endif const char *zmq::mechanism_t::socket_type_string (int socket_type_) @@ -106,7 +107,7 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type_) #ifdef ZMQ_BUILD_DRAFT_API socket_type_server, socket_type_client, socket_type_radio, socket_type_dish, socket_type_gather, socket_type_scatter, - socket_type_dgram + socket_type_dgram, socket_type_peer #endif }; static const size_t names_count = sizeof (names) / sizeof (names[0]); @@ -353,6 +354,8 @@ bool zmq::mechanism_t::check_socket_type (const char *type_, return strequals (type_, len_, socket_type_gather); case ZMQ_DGRAM: return strequals (type_, len_, socket_type_dgram); + case ZMQ_PEER: + return strequals (type_, len_, socket_type_peer); #endif default: break; diff --git a/src/peer.cpp b/src/peer.cpp new file mode 100644 index 00000000..027fb57a --- /dev/null +++ b/src/peer.cpp @@ -0,0 +1,68 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" +#include "peer.hpp" +#include "pipe.hpp" +#include "wire.hpp" +#include "random.hpp" +#include "likely.hpp" +#include "err.hpp" + +zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + server_t (parent_, tid_, sid_) +{ + options.type = ZMQ_PEER; +} + +uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_) +{ + scoped_optional_lock_t sync_lock (&_sync); + + // connect_peer cannot work with immediate enabled + if (options.immediate == 1) { + errno = EFAULT; + return 0; + } + + int rc = socket_base_t::connect_internal (endpoint_uri_); + if (rc != 0) + return 0; + + return _peer_last_routing_id; +} + +void zmq::peer_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) +{ + server_t::xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_); + _peer_last_routing_id = pipe_->get_server_socket_routing_id (); +} diff --git a/src/peer.hpp b/src/peer.hpp new file mode 100644 index 00000000..96674b29 --- /dev/null +++ b/src/peer.hpp @@ -0,0 +1,67 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_PEER_HPP_INCLUDED__ +#define __ZMQ_PEER_HPP_INCLUDED__ + +#include + +#include "socket_base.hpp" +#include "server.hpp" +#include "session_base.hpp" +#include "stdint.hpp" +#include "blob.hpp" +#include "fq.hpp" + +namespace zmq +{ +class ctx_t; +class msg_t; +class pipe_t; + +class peer_t ZMQ_FINAL : public server_t +{ + public: + peer_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); + + uint32_t connect_peer (const char *endpoint_uri_); + + private: + uint32_t _peer_last_routing_id; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (peer_t) +}; +} + +#endif diff --git a/src/server.hpp b/src/server.hpp index 72c2ce41..cef1e4d2 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -45,7 +45,7 @@ class msg_t; class pipe_t; // TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm. -class server_t ZMQ_FINAL : public socket_base_t +class server_t : public socket_base_t { public: server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); diff --git a/src/session_base.cpp b/src/session_base.cpp index f4f68400..c806a7eb 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -87,6 +87,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_GATHER: case ZMQ_SCATTER: case ZMQ_DGRAM: + case ZMQ_PEER: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 9fe97d0e..07223097 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -100,6 +100,7 @@ #include "gather.hpp" #include "scatter.hpp" #include "dgram.hpp" +#include "peer.hpp" void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_, pipe_t *pipe_) @@ -207,6 +208,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, case ZMQ_DGRAM: s = new (std::nothrow) dgram_t (parent_, tid_, sid_); break; + case ZMQ_PEER: + s = new (std::nothrow) peer_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; @@ -228,6 +232,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, int sid_, bool thread_safe_) : own_t (parent_, tid_), + _sync (), _tag (0xbaddecaf), _ctx_terminated (false), _destroyed (false), @@ -240,7 +245,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, _monitor_events (0), _thread_safe (thread_safe_), _reaper_signaler (NULL), - _sync (), _monitor_sync () { options.socket_id = sid_; @@ -740,7 +744,11 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) int zmq::socket_base_t::connect (const char *endpoint_uri_) { scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL); + return connect_internal (endpoint_uri_); +} +int zmq::socket_base_t::connect_internal (const char *endpoint_uri_) +{ if (unlikely (_ctx_terminated)) { errno = ETERM; return -1; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 784147c9..c6066741 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -205,6 +205,11 @@ class socket_base_t : public own_t, // Delay actual destruction of the socket. void process_destroy () ZMQ_FINAL; + int connect_internal (const char *endpoint_uri_); + + // Mutex for synchronize access to the socket in thread safe mode + mutex_t _sync; + private: // test if event should be sent and then dispatch it void event (const endpoint_uri_pair_t &endpoint_uri_pair_, @@ -336,9 +341,6 @@ class socket_base_t : public own_t, // Signaler to be used in the reaping stage signaler_t *_reaper_signaler; - // Mutex for synchronize access to the socket in thread safe mode - mutex_t _sync; - // Mutex to synchronize access to the monitor Pair socket mutex_t _monitor_sync; diff --git a/src/zmq.cpp b/src/zmq.cpp index f0afb733..f2c7b42f 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -40,6 +40,7 @@ #include "macros.hpp" #include "poller.hpp" +#include "peer.hpp" #if !defined ZMQ_HAVE_POLLER // On AIX platform, poll.h has to be included first to get consistent @@ -336,6 +337,28 @@ int zmq_connect (void *s_, const char *addr_) return s->connect (addr_); } +uint32_t zmq_connect_peer (void *s_, const char *addr_) +{ + zmq::peer_t *s = static_cast (s_); + if (!s_ || !s->check_tag ()) { + errno = ENOTSOCK; + return 0; + } + + int socket_type; + size_t socket_type_size = sizeof (socket_type); + if (s->getsockopt (ZMQ_TYPE, &socket_type, &socket_type_size) != 0) + return 0; + + if (socket_type != ZMQ_PEER) { + errno = ENOTSUP; + return 0; + } + + return s->connect_peer (addr_); +} + + int zmq_unbind (void *s_, const char *addr_) { zmq::socket_base_t *s = as_socket_base_t (s_); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 7ebcb617..a1c0c5cc 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -45,6 +45,7 @@ #define ZMQ_GATHER 16 #define ZMQ_SCATTER 17 #define ZMQ_DGRAM 18 +#define ZMQ_PEER 19 /* DRAFT Socket options. */ #define ZMQ_ZAP_ENFORCE_DOMAIN 93 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8ce6a859..04713300 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -162,6 +162,7 @@ if(ENABLE_DRAFTS) test_app_meta test_router_notify test_xpub_manual_last_value + test_peer ) endif() diff --git a/tests/test_peer.cpp b/tests/test_peer.cpp new file mode 100644 index 00000000..30127f58 --- /dev/null +++ b/tests/test_peer.cpp @@ -0,0 +1,113 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +void test_peer () +{ + size_t len = MAX_SOCKET_STRING; + char my_endpoint[MAX_SOCKET_STRING]; + + void *peer1 = test_context_socket (ZMQ_PEER); + bind_loopback (peer1, false, my_endpoint, len); + + void *peer2 = test_context_socket (ZMQ_PEER); + uint32_t peer1_routing_id = zmq_connect_peer (peer2, my_endpoint); + TEST_ASSERT_NOT_EQUAL (0, peer1_routing_id); + + { + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1)); + + char *data = static_cast (zmq_msg_data (&msg)); + data[0] = 1; + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_msg_set_routing_id (&msg, peer1_routing_id)); + + int rc = zmq_msg_send (&msg, peer2, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + } + + uint32_t peer2_routing_id; + { + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + + int rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, peer1, 0)); + TEST_ASSERT_EQUAL_INT (1, rc); + + peer2_routing_id = zmq_msg_routing_id (&msg); + TEST_ASSERT_NOT_EQUAL (0, peer2_routing_id); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + } + + { + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1)); + + char *data = static_cast (zmq_msg_data (&msg)); + data[0] = 2; + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_msg_set_routing_id (&msg, peer2_routing_id)); + + int rc = zmq_msg_send (&msg, peer1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + } + + { + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + + int rc = zmq_msg_recv (&msg, peer2, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + uint32_t routing_id = zmq_msg_routing_id (&msg); + TEST_ASSERT_EQUAL_UINT32 (peer1_routing_id, routing_id); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + } + + test_context_socket_close (peer1); + test_context_socket_close (peer2); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_peer); + return UNITY_END (); +}