0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Merge pull request #3904 from somdoron/CHANNEL

problem: no thread-safe alternative for ZMQ_PAIR
This commit is contained in:
Luca Boccassi 2020-05-09 10:49:35 +01:00 committed by GitHub
commit a1d82598bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 392 additions and 14 deletions

View File

@ -844,6 +844,7 @@ endif()
set(cxx-sources
precompiled.cpp
address.cpp
channel.cpp
client.cpp
clock.cpp
ctx.cpp
@ -947,6 +948,7 @@ set(cxx-sources
atomic_counter.hpp
atomic_ptr.hpp
blob.hpp
channel.hpp
client.hpp
clock.hpp
command.hpp

View File

@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \
src/atomic_counter.hpp \
src/atomic_ptr.hpp \
src/blob.hpp \
src/channel.cpp \
src/channel.hpp \
src/client.cpp \
src/client.hpp \
src/clock.cpp \
@ -1047,7 +1049,8 @@ test_apps += tests/test_poller \
tests/test_reconnect_options \
tests/test_msg_init \
tests/test_hello_msg \
tests/test_disconnect_msg
tests/test_disconnect_msg \
tests/test_channel
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
@ -1108,6 +1111,10 @@ tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_disconnect_msg_SOURCES = tests/test_disconnect_msg.cpp
tests_test_disconnect_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_channel_SOURCES = tests/test_channel.cpp
tests_test_channel_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif
if FUZZING_ENGINE_LIB

View File

@ -30,7 +30,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7]
'udp':: unreliable unicast and multicast using UDP, see linkzmq:zmq_udp[7]
Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one
Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one
semantics. The precise semantics depend on the socket type and are defined in
linkzmq:zmq_socket[3].

View File

@ -30,7 +30,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an
'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7]
'udp':: unreliable unicast and multicast using UDP, see linkzmq:zmq_udp[7]
Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one
Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one
semantics. The precise semantics depend on the socket type and are defined in
linkzmq:zmq_socket[3].
@ -39,7 +39,7 @@ immediately but as needed by 0MQ. Thus a successful call to _zmq_connect()_
does not mean that the connection was or could actually be established.
Because of this, for most transports and socket types the order in which
a 'server' socket is bound and a 'client' socket is connected to it does not
matter. The _ZMQ_PAIR_ sockets are an exception, as they do not automatically
matter. The _ZMQ_PAIR_ and _ZMQ_CHANNEL_ sockets are an exception, as they do not automatically
reconnect to endpoints.
NOTE: following a _zmq_connect()_, for socket types except for ZMQ_ROUTER,

View File

@ -41,7 +41,7 @@ the event that a peer is unavailable to receive them.
Conventional sockets allow only strict one-to-one (two peers), many-to-one
(many clients, one server), or in some cases one-to-many (multicast)
relationships. With the exception of 'ZMQ_PAIR', 0MQ sockets may be connected
relationships. With the exception of 'ZMQ_PAIR' and 'ZMQ_CHANNEL', 0MQ sockets may be connected
*to multiple endpoints* using _zmq_connect()_, while simultaneously accepting
incoming connections *from multiple endpoints* bound to the socket using
_zmq_bind()_, thus allowing many-to-many relationships.
@ -60,6 +60,7 @@ Following are the thread safe sockets:
* ZMQ_SCATTER
* ZMQ_GATHER
* ZMQ_PEER
* ZMQ_CHANNEL
.Socket types
The following sections present the socket types defined by 0MQ, grouped by the
@ -476,6 +477,47 @@ Outgoing routing strategy:: See text
Incoming routing strategy:: Fair-queued
Action in mute state:: Return EAGAIN
Channel pattern
~~~~~~~~~~~~~~~~~~~~~~
The channel pattern is the thread-safe version of the exclusive pair pattern.
The channel pattern is used to connect a peer to precisely one other
peer. This pattern is used for inter-thread communication across the inproc
transport.
NOTE: Channel is still in draft phase.
ZMQ_CHANNEL
^^^^^^^^
A socket of type 'ZMQ_CHANNEL' can only be connected to a single peer at any one
time. No message routing or filtering is performed on messages sent over a
'ZMQ_CHANNEL' socket.
When a 'ZMQ_CHANNEL' socket enters the 'mute' state due to having reached the
high water mark for the connected peer, or, for connection-oriented transports,
if the ZMQ_IMMEDIATE option is set and there is no connected peer, then
any linkzmq:zmq_send[3] operations on the socket shall block until the peer
becomes available for sending; messages are not discarded.
While 'ZMQ_CHANNEL' sockets can be used over transports other than linkzmq:zmq_inproc[7],
their inability to auto-reconnect coupled with the fact new incoming connections will
be terminated while any previous connections (including ones in a closing state)
exist makes them unsuitable for TCP in most cases.
NOTE: 'ZMQ_CHANNEL' sockets are designed for inter-thread communication across
the linkzmq:zmq_inproc[7] transport and do not implement functionality such
as auto-reconnection.
NOTE: 'ZMQ_CHANNEL' sockets are threadsafe. They do not accept ZMQ_RCVMORE on receives.
This limits them to single part data.
[horizontal]
.Summary of ZMQ_CHANNEL characteristics
Compatible peer sockets:: 'ZMQ_CHANNEL'
Direction:: Bidirectional
Send/receive pattern:: Unrestricted
Incoming routing strategy:: N/A
Outgoing routing strategy:: N/A
Action in mute state:: Block
Native Pattern
~~~~~~~~~~~~~~

View File

@ -659,6 +659,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
#define ZMQ_PEER 19
#define ZMQ_CHANNEL 20
/* DRAFT Socket options. */
#define ZMQ_ZAP_ENFORCE_DOMAIN 93

160
src/channel.cpp Normal file
View File

@ -0,0 +1,160 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "macros.hpp"
#include "channel.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "msg.hpp"
zmq::channel_t::channel_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true),
_pipe (NULL)
{
options.type = ZMQ_CHANNEL;
}
zmq::channel_t::~channel_t ()
{
zmq_assert (!_pipe);
}
void zmq::channel_t::xattach_pipe (pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
LIBZMQ_UNUSED (locally_initiated_);
zmq_assert (pipe_ != NULL);
// ZMQ_PAIR socket can only be connected to a single peer.
// The socket rejects any further connection requests.
if (_pipe == NULL)
_pipe = pipe_;
else
pipe_->terminate (false);
}
void zmq::channel_t::xpipe_terminated (pipe_t *pipe_)
{
if (pipe_ == _pipe)
_pipe = NULL;
}
void zmq::channel_t::xread_activated (pipe_t *)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}
void zmq::channel_t::xwrite_activated (pipe_t *)
{
// There's just one pipe. No lists of active and inactive pipes.
// There's nothing to do here.
}
int zmq::channel_t::xsend (msg_t *msg_)
{
// CHANNEL sockets do not allow multipart data (ZMQ_SNDMORE)
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}
if (!_pipe || !_pipe->write (msg_)) {
errno = EAGAIN;
return -1;
}
_pipe->flush ();
// Detach the original message from the data buffer.
const int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
int zmq::channel_t::xrecv (msg_t *msg_)
{
// Deallocate old content of the message.
int rc = msg_->close ();
errno_assert (rc == 0);
if (!_pipe) {
// Initialise the output parameter to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
// Drop any messages with more flag
bool read = _pipe->read (msg_);
while (read && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message
read = _pipe->read (msg_);
while (read && msg_->flags () & msg_t::more)
read = _pipe->read (msg_);
// get the new message
if (read)
read = _pipe->read (msg_);
}
if (!read) {
// Initialise the output parameter to be a 0-byte message.
rc = msg_->init ();
errno_assert (rc == 0);
errno = EAGAIN;
return -1;
}
return 0;
}
bool zmq::channel_t::xhas_in ()
{
if (!_pipe)
return false;
return _pipe->check_read ();
}
bool zmq::channel_t::xhas_out ()
{
if (!_pipe)
return false;
return _pipe->check_write ();
}

69
src/channel.hpp Normal file
View File

@ -0,0 +1,69 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CHANNEL_HPP_INCLUDED__
#define __ZMQ_CHANNEL_HPP_INCLUDED__
#include "blob.hpp"
#include "socket_base.hpp"
#include "session_base.hpp"
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class channel_t ZMQ_FINAL : public socket_base_t
{
public:
channel_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~channel_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_,
bool locally_initiated_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
private:
zmq::pipe_t *_pipe;
ZMQ_NON_COPYABLE_NOR_MOVABLE (channel_t)
};
}
#endif

View File

@ -94,20 +94,24 @@ const char socket_type_gather[] = "GATHER";
const char socket_type_scatter[] = "SCATTER";
const char socket_type_dgram[] = "DGRAM";
const char socket_type_peer[] = "PEER";
const char socket_type_channel[] = "CHANNEL";
#endif
const char *zmq::mechanism_t::socket_type_string (int socket_type_)
{
// TODO the order must of the names must correspond to the values resp. order of ZMQ_* socket type definitions in zmq.h!
static const char *names[] = {
socket_type_pair, socket_type_pub, socket_type_sub,
socket_type_req, socket_type_rep, socket_type_dealer,
socket_type_router, socket_type_pull, socket_type_push,
socket_type_xpub, socket_type_xsub, socket_type_stream,
static const char *names[] = {socket_type_pair, socket_type_pub,
socket_type_sub, socket_type_req,
socket_type_rep, socket_type_dealer,
socket_type_router, socket_type_pull,
socket_type_push, socket_type_xpub,
socket_type_xsub, socket_type_stream,
#ifdef ZMQ_BUILD_DRAFT_API
socket_type_server, socket_type_client, socket_type_radio,
socket_type_dish, socket_type_gather, socket_type_scatter,
socket_type_dgram, socket_type_peer
socket_type_server, socket_type_client,
socket_type_radio, socket_type_dish,
socket_type_gather, socket_type_scatter,
socket_type_dgram, socket_type_peer,
socket_type_channel
#endif
};
static const size_t names_count = sizeof (names) / sizeof (names[0]);
@ -356,6 +360,8 @@ bool zmq::mechanism_t::check_socket_type (const char *type_,
return strequals (type_, len_, socket_type_dgram);
case ZMQ_PEER:
return strequals (type_, len_, socket_type_peer);
case ZMQ_CHANNEL:
return strequals (type_, len_, socket_type_channel);
#endif
default:
break;

View File

@ -88,6 +88,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_SCATTER:
case ZMQ_DGRAM:
case ZMQ_PEER:
case ZMQ_CHANNEL:
#ifdef ZMQ_BUILD_DRAFT_API
if (options_.can_send_hello_msg && options_.hello_msg.size () > 0)
s = new (std::nothrow) hello_msg_session_t (

View File

@ -105,6 +105,7 @@
#include "scatter.hpp"
#include "dgram.hpp"
#include "peer.hpp"
#include "channel.hpp"
void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
pipe_t *pipe_)
@ -217,6 +218,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_,
case ZMQ_PEER:
s = new (std::nothrow) peer_t (parent_, tid_, sid_);
break;
case ZMQ_CHANNEL:
s = new (std::nothrow) channel_t (parent_, tid_, sid_);
break;
default:
errno = EINVAL;
return NULL;

View File

@ -46,6 +46,7 @@
#define ZMQ_SCATTER 17
#define ZMQ_DGRAM 18
#define ZMQ_PEER 19
#define ZMQ_CHANNEL 20
/* DRAFT Socket options. */
#define ZMQ_ZAP_ENFORCE_DOMAIN 93

View File

@ -156,7 +156,11 @@ if(ENABLE_DRAFTS)
test_xpub_manual_last_value
test_peer
test_reconnect_options
test_msg_init)
test_msg_init
test_channel
test_hello_msg
test_disconnect_msg
)
endif()
if(ZMQ_HAVE_WS)

81
tests/test_channel.cpp Normal file
View File

@ -0,0 +1,81 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <unity.h>
void *sb;
void *sc;
void setUp ()
{
setup_test_context ();
sb = test_context_socket (ZMQ_CHANNEL);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://a"));
sc = test_context_socket (ZMQ_CHANNEL);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://a"));
}
void tearDown ()
{
test_context_socket_close (sc);
test_context_socket_close (sb);
teardown_test_context ();
}
void test_roundtrip ()
{
send_string_expect_success (sb, "HELLO", 0);
recv_string_expect_success (sc, "HELLO", 0);
send_string_expect_success (sc, "WORLD", 0);
recv_string_expect_success (sb, "WORLD", 0);
}
void test_sndmore_fails ()
{
int rc = zmq_send (sc, "X", 1, ZMQ_SNDMORE);
TEST_ASSERT_EQUAL_INT (-1, rc);
TEST_ASSERT_EQUAL_INT (EINVAL, errno);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_roundtrip);
RUN_TEST (test_sndmore_fails);
return UNITY_END ();
}