0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 15:41:05 +08:00

Merge pull request #3814 from bluca/sub_cancel_decoder

Implement ZMTP 3.1 subscribe/cancel via commands
This commit is contained in:
Simon Giesecke 2020-02-07 09:26:21 +01:00 committed by GitHub
commit f17a794d59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 437 additions and 133 deletions

View File

@ -865,6 +865,7 @@ set(cxx-sources
v1_encoder.cpp
v2_decoder.cpp
v2_encoder.cpp
v3_1_encoder.cpp
xpub.cpp
xsub.cpp
zmq.cpp
@ -1007,6 +1008,7 @@ set(cxx-sources
v1_encoder.hpp
v2_decoder.hpp
v2_encoder.hpp
v3_1_encoder.hpp
v2_protocol.hpp
vmci.hpp
vmci_address.hpp

View File

@ -239,6 +239,8 @@ src_libzmq_la_SOURCES = \
src/v1_encoder.hpp \
src/v2_encoder.cpp \
src/v2_encoder.hpp \
src/v3_1_encoder.cpp \
src/v3_1_encoder.hpp \
src/v2_protocol.hpp \
src/vmci.cpp \
src/vmci.hpp \

View File

@ -212,6 +212,36 @@ int zmq::msg_t::init_leave ()
return 0;
}
int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic)
{
int rc = init_size (size_);
if (rc == 0) {
set_flags (zmq::msg_t::subscribe);
// We explicitly allow a NULL subscription with size zero
if (size_) {
assert (topic);
memcpy (data (), topic, size_);
}
}
return rc;
}
int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic)
{
int rc = init_size (size_);
if (rc == 0) {
set_flags (zmq::msg_t::cancel);
// We explicitly allow a NULL subscription with size zero
if (size_) {
assert (topic);
memcpy (data (), topic, size_);
}
}
return rc;
}
int zmq::msg_t::close ()
{
// Check the validity of the message.
@ -487,9 +517,12 @@ size_t zmq::msg_t::command_body_size () const
{
if (this->is_ping () || this->is_pong ())
return this->size () - ping_cmd_name_size;
if (this->is_subscribe ())
else if (!(this->flags () & msg_t::command)
&& (this->is_subscribe () || this->is_cancel ()))
return this->size ();
else if (this->is_subscribe ())
return this->size () - sub_cmd_name_size;
if (this->is_cancel ())
else if (this->is_cancel ())
return this->size () - cancel_cmd_name_size;
return 0;
@ -498,12 +531,17 @@ size_t zmq::msg_t::command_body_size () const
void *zmq::msg_t::command_body ()
{
unsigned char *data = NULL;
if (this->is_ping () || this->is_pong ())
data =
static_cast<unsigned char *> (this->data ()) + ping_cmd_name_size;
if (this->is_subscribe ())
// With inproc, command flag is not set for sub/cancel
else if (!(this->flags () & msg_t::command)
&& (this->is_subscribe () || this->is_cancel ()))
data = static_cast<unsigned char *> (this->data ());
else if (this->is_subscribe ())
data = static_cast<unsigned char *> (this->data ()) + sub_cmd_name_size;
if (this->is_cancel ())
else if (this->is_cancel ())
data =
static_cast<unsigned char *> (this->data ()) + cancel_cmd_name_size;

View File

@ -54,6 +54,9 @@ namespace zmq
// Note that this structure needs to be explicitly constructed
// (init functions) and destructed (close function).
static const char cancel_cmd_name[] = "\6CANCEL";
static const char sub_cmd_name[] = "\x9SUBSCRIBE";
class msg_t
{
public:
@ -109,6 +112,8 @@ class msg_t
int init_delimiter ();
int init_join ();
int init_leave ();
int init_subscribe (const size_t size_, const unsigned char *topic);
int init_cancel (const size_t size_, const unsigned char *topic);
int close ();
int move (msg_t &src_);
int copy (msg_t &src_);

View File

@ -56,15 +56,15 @@ int zmq::sub_t::xsetsockopt (int option_,
// Create the subscription message.
msg_t msg;
int rc = msg.init_size (optvallen_ + 1);
errno_assert (rc == 0);
unsigned char *data = static_cast<unsigned char *> (msg.data ());
*data = (option_ == ZMQ_SUBSCRIBE);
// We explicitly allow a NULL subscription with size zero
if (optvallen_) {
assert (optval_);
memcpy (data + 1, optval_, optvallen_);
int rc;
const unsigned char *data = static_cast<const unsigned char *> (optval_);
if (option_ == ZMQ_SUBSCRIBE) {
rc = msg.init_subscribe (optvallen_, data);
} else {
rc = msg.init_cancel (optvallen_, data);
}
errno_assert (rc == 0);
// Pass it further on in the stack.
rc = xsub_t::xsend (&msg);
return close_and_return (&msg, rc);

View File

@ -55,23 +55,41 @@ void zmq::v1_encoder_t::size_ready ()
void zmq::v1_encoder_t::message_ready ()
{
size_t header_size = 2; // flags byte + size byte
// Get the message size.
size_t size = in_progress ()->size ();
// Account for the 'flags' byte.
size++;
// Account for the subscribe/cancel byte.
if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ())
size++;
// For messages less than 255 bytes long, write one byte of message size.
// For longer messages write 0xff escape character followed by 8-byte
// message size. In both cases 'flags' field follows.
if (size < UCHAR_MAX) {
_tmpbuf[0] = static_cast<unsigned char> (size);
_tmpbuf[1] = (in_progress ()->flags () & msg_t::more);
next_step (_tmpbuf, 2, &v1_encoder_t::size_ready, false);
} else {
_tmpbuf[0] = UCHAR_MAX;
put_uint64 (_tmpbuf + 1, size);
_tmpbuf[9] = (in_progress ()->flags () & msg_t::more);
next_step (_tmpbuf, 10, &v1_encoder_t::size_ready, false);
header_size = 10;
}
// Encode the subscribe/cancel byte. This is done in the encoder as
// opposed to when the subscribe message is created to allow different
// protocol behaviour on the wire in the v3.1 and legacy encoders.
// It results in the work being done multiple times in case the sub
// is sending the subscription/cancel to multiple pubs, but it cannot
// be avoided. This processing can be moved to xsub once support for
// ZMTP < 3.1 is dropped.
if (in_progress ()->is_subscribe ())
_tmpbuf[header_size++] = 1;
else if (in_progress ()->is_cancel ())
_tmpbuf[header_size++] = 0;
next_step (_tmpbuf, header_size, &v1_encoder_t::size_ready, false);
}

View File

@ -46,7 +46,7 @@ class v1_encoder_t ZMQ_FINAL : public encoder_base_t<v1_encoder_t>
void size_ready ();
void message_ready ();
unsigned char _tmpbuf[10];
unsigned char _tmpbuf[11];
ZMQ_NON_COPYABLE_NOR_MOVABLE (v1_encoder_t)
};

View File

@ -50,6 +50,8 @@ zmq::v2_encoder_t::~v2_encoder_t ()
void zmq::v2_encoder_t::message_ready ()
{
// Encode flags.
size_t size = in_progress ()->size ();
size_t header_size = 2; // flags byte + size byte
unsigned char &protocol_flags = _tmp_buf[0];
protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more)
@ -58,18 +60,32 @@ void zmq::v2_encoder_t::message_ready ()
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress ()->flags () & msg_t::command)
protocol_flags |= v2_protocol_t::command_flag;
if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ())
++size;
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
const size_t size = in_progress ()->size ();
if (unlikely (size > UCHAR_MAX)) {
put_uint64 (_tmp_buf + 1, size);
next_step (_tmp_buf, 9, &v2_encoder_t::size_ready, false);
header_size = 9; // flags byte + size 8 bytes
} else {
_tmp_buf[1] = static_cast<uint8_t> (size);
next_step (_tmp_buf, 2, &v2_encoder_t::size_ready, false);
}
// Encode the subscribe/cancel byte. This is done in the encoder as
// opposed to when the subscribe message is created to allow different
// protocol behaviour on the wire in the v3.1 and legacy encoders.
// It results in the work being done multiple times in case the sub
// is sending the subscription/cancel to multiple pubs, but it cannot
// be avoided. This processing can be moved to xsub once support for
// ZMTP < 3.1 is dropped.
if (in_progress ()->is_subscribe ())
_tmp_buf[header_size++] = 1;
else if (in_progress ()->is_cancel ())
_tmp_buf[header_size++] = 0;
next_step (_tmp_buf, header_size, &v2_encoder_t::size_ready, false);
}
void zmq::v2_encoder_t::size_ready ()

View File

@ -46,7 +46,8 @@ class v2_encoder_t ZMQ_FINAL : public encoder_base_t<v2_encoder_t>
void size_ready ();
void message_ready ();
unsigned char _tmp_buf[9];
// flags byte + size byte (or 8 bytes) + sub/cancel byte
unsigned char _tmp_buf[10];
ZMQ_NON_COPYABLE_NOR_MOVABLE (v2_encoder_t)
};

105
src/v3_1_encoder.cpp Normal file
View File

@ -0,0 +1,105 @@
/*
Copyright (c) 2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "precompiled.hpp"
#include "v2_protocol.hpp"
#include "v3_1_encoder.hpp"
#include "msg.hpp"
#include "likely.hpp"
#include "wire.hpp"
#include <limits.h>
zmq::v3_1_encoder_t::v3_1_encoder_t (size_t bufsize_) :
encoder_base_t<v3_1_encoder_t> (bufsize_)
{
// Write 0 bytes to the batch and go to message_ready state.
next_step (NULL, 0, &v3_1_encoder_t::message_ready, true);
}
zmq::v3_1_encoder_t::~v3_1_encoder_t ()
{
}
void zmq::v3_1_encoder_t::message_ready ()
{
// Encode flags.
size_t size = in_progress ()->size ();
size_t header_size = 2; // flags byte + size byte
unsigned char &protocol_flags = _tmp_buf[0];
protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag;
if (in_progress ()->size () > UCHAR_MAX)
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress ()->flags () & msg_t::command
|| in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) {
protocol_flags |= v2_protocol_t::command_flag;
if (in_progress ()->is_subscribe ())
size += zmq::msg_t::sub_cmd_name_size;
else if (in_progress ()->is_cancel ())
size += zmq::msg_t::cancel_cmd_name_size;
}
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger
// messages, 64-bit unsigned integer in network byte order is used.
if (unlikely (size > UCHAR_MAX)) {
put_uint64 (_tmp_buf + 1, size);
header_size = 9; // flags byte + size 8 bytes
} else {
_tmp_buf[1] = static_cast<uint8_t> (size);
}
// Encode the sub/cancel command string. This is done in the encoder as
// opposed to when the subscribe message is created to allow different
// protocol behaviour on the wire in the v3.1 and legacy encoders.
// It results in the work being done multiple times in case the sub
// is sending the subscription/cancel to multiple pubs, but it cannot
// be avoided. This processing can be moved to xsub once support for
// ZMTP < 3.1 is dropped.
if (in_progress ()->is_subscribe ()) {
memcpy (_tmp_buf + header_size, zmq::sub_cmd_name,
zmq::msg_t::sub_cmd_name_size);
header_size += zmq::msg_t::sub_cmd_name_size;
} else if (in_progress ()->is_cancel ()) {
memcpy (_tmp_buf + header_size, zmq::cancel_cmd_name,
zmq::msg_t::cancel_cmd_name_size);
header_size += zmq::msg_t::cancel_cmd_name_size;
}
next_step (_tmp_buf, header_size, &v3_1_encoder_t::size_ready, false);
}
void zmq::v3_1_encoder_t::size_ready ()
{
// Write message body into the buffer.
next_step (in_progress ()->data (), in_progress ()->size (),
&v3_1_encoder_t::message_ready, true);
}

56
src/v3_1_encoder.hpp Normal file
View File

@ -0,0 +1,56 @@
/*
Copyright (c) 2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_V3_1_ENCODER_HPP_INCLUDED__
#define __ZMQ_V3_1_ENCODER_HPP_INCLUDED__
#include "encoder.hpp"
#include "msg.hpp"
namespace zmq
{
// Encoder for 0MQ framing protocol. Converts messages into data stream.
class v3_1_encoder_t ZMQ_FINAL : public encoder_base_t<v3_1_encoder_t>
{
public:
v3_1_encoder_t (size_t bufsize_);
~v3_1_encoder_t () ZMQ_FINAL;
private:
void size_ready ();
void message_ready ();
unsigned char _tmp_buf[9 + zmq::msg_t::sub_cmd_name_size];
ZMQ_NON_COPYABLE_NOR_MOVABLE (v3_1_encoder_t)
};
}
#endif

View File

@ -102,6 +102,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
size_t size = 0;
bool subscribe = false;
bool is_subscribe_or_cancel = false;
bool notify = false;
const bool first_part = !_more_recv;
_more_recv = (msg.flags () & msg_t::more) != 0;
@ -144,25 +145,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
_manual_subscriptions.add (data, size, pipe_);
_pending_pipes.push_back (pipe_);
// ZMTP 3.1 hack: we need to support sub/cancel commands, but
// we can't give them back to userspace as it would be an API
// breakage since the payload of the message is completely
// different. Manually craft an old-style message instead.
data = data - 1;
size = size + 1;
if (subscribe)
*data = 1;
else
*data = 0;
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
} else {
bool notify;
if (!subscribe) {
const mtrie_t::rm_result rm_result =
_subscriptions.rm (data, size, pipe_);
@ -172,25 +155,36 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
const bool first_added = _subscriptions.add (data, size, pipe_);
notify = first_added || _verbose_subs;
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode is enabled, store it so that
// it can be passed to the user on next recv call.
if (options.type == ZMQ_XPUB && notify) {
data = data - 1;
size = size + 1;
if (subscribe)
*data = 1;
else
*data = 0;
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
}
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode or manual mode are enabled, store it
// so that it can be passed to the user on next recv call.
if (_manual || (options.type == ZMQ_XPUB && notify)) {
// ZMTP 3.1 hack: we need to support sub/cancel commands, but
// we can't give them back to userspace as it would be an API
// breakage since the payload of the message is completely
// different. Manually craft an old-style message instead.
// Although with other transports it would be possible to simply
// reuse the same buffer and prefix a 0/1 byte to the topic, with
// inproc the subscribe/cancel command string is not present in
// the message, so this optimization is not possible.
// The pushback makes a copy of the data array anyway, so the
// number of buffer copies does not change.
blob_t notification (size + 1);
if (subscribe)
*notification.data () = 1;
else
*notification.data () = 0;
memcpy (notification.data () + 1, data, size);
_pending_data.push_back (ZMQ_MOVE (notification));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
}
msg.close ();
}
}

View File

@ -135,10 +135,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
// however this is alread done on the XPUB side and
// doing it here as well breaks ZMQ_XPUB_VERBOSE
// when there are forwarding devices involved.
if (msg_->is_subscribe ()) {
data = static_cast<unsigned char *> (msg_->command_body ());
size = msg_->command_body_size ();
} else {
if (!msg_->is_subscribe ()) {
data = data + 1;
size = size - 1;
}
@ -148,10 +145,7 @@ int zmq::xsub_t::xsend (msg_t *msg_)
}
if (msg_->is_cancel () || (size > 0 && *data == 0)) {
// Process unsubscribe message
if (msg_->is_cancel ()) {
data = static_cast<unsigned char *> (msg_->command_body ());
size = msg_->command_body_size ();
} else {
if (!msg_->is_cancel ()) {
data = data + 1;
size = size - 1;
}
@ -271,16 +265,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_,
// Create the subscription message.
msg_t msg;
const int rc = msg.init_size (size_ + 1);
const int rc = msg.init_subscribe (size_, data_);
errno_assert (rc == 0);
unsigned char *data = static_cast<unsigned char *> (msg.data ());
data[0] = 1;
// We explicitly allow a NULL subscription with size zero
if (size_) {
assert (data_);
memcpy (data + 1, data_, size_);
}
// Send it to the pipe.
const bool sent = pipe->write (&msg);

View File

@ -47,6 +47,7 @@
#include "v1_decoder.hpp"
#include "v2_encoder.hpp"
#include "v2_decoder.hpp"
#include "v3_1_encoder.hpp"
#include "null_mechanism.hpp"
#include "plain_client.hpp"
#include "plain_server.hpp"
@ -115,8 +116,9 @@ void zmq::zmtp_engine_t::plug_internal ()
in_event ();
}
// Position of the revision field in the greeting.
// Position of the revision and minor fields in the greeting.
const size_t revision_pos = 10;
const size_t minor_pos = 11;
bool zmq::zmtp_engine_t::handshake ()
{
@ -128,8 +130,8 @@ bool zmq::zmtp_engine_t::handshake ()
const bool unversioned = rc != 0;
if (!(this
->*select_handshake_fun (unversioned,
_greeting_recv[revision_pos])) ())
->*select_handshake_fun (unversioned, _greeting_recv[revision_pos],
_greeting_recv[minor_pos])) ())
return false;
// Start polling for output if necessary.
@ -203,7 +205,7 @@ void zmq::zmtp_engine_t::receive_greeting_versioned ()
|| _greeting_recv[revision_pos] == ZMTP_2_0)
_outpos[_outsize++] = _options.type;
else {
_outpos[_outsize++] = 0; // Minor version number
_outpos[_outsize++] = 1; // Minor version number
memset (_outpos + _outsize, 0, 20);
zmq_assert (_options.mechanism == ZMQ_NULL
@ -228,9 +230,8 @@ void zmq::zmtp_engine_t::receive_greeting_versioned ()
}
}
zmq::zmtp_engine_t::handshake_fun_t
zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_,
unsigned char revision_)
zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun (
bool unversioned_, unsigned char revision_, unsigned char minor_)
{
// Is the peer using ZMTP/1.0 with no revision number?
if (unversioned_) {
@ -241,8 +242,15 @@ zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_,
return &zmtp_engine_t::handshake_v1_0;
case ZMTP_2_0:
return &zmtp_engine_t::handshake_v2_0;
case ZMTP_3_x:
switch (minor_) {
case 0:
return &zmtp_engine_t::handshake_v3_0;
default:
return &zmtp_engine_t::handshake_v3_1;
}
default:
return &zmtp_engine_t::handshake_v3_0;
return &zmtp_engine_t::handshake_v3_1;
}
}
@ -339,15 +347,8 @@ bool zmq::zmtp_engine_t::handshake_v2_0 ()
return true;
}
bool zmq::zmtp_engine_t::handshake_v3_0 ()
bool zmq::zmtp_engine_t::handshake_v3_x ()
{
_encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) v2_decoder_t (
_options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder);
if (_options.mechanism == ZMQ_NULL
&& memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
20)
@ -408,6 +409,30 @@ bool zmq::zmtp_engine_t::handshake_v3_0 ()
return true;
}
bool zmq::zmtp_engine_t::handshake_v3_0 ()
{
_encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) v2_decoder_t (
_options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder);
return zmq::zmtp_engine_t::handshake_v3_x ();
}
bool zmq::zmtp_engine_t::handshake_v3_1 ()
{
_encoder = new (std::nothrow) v3_1_encoder_t (_options.out_batch_size);
alloc_assert (_encoder);
_decoder = new (std::nothrow) v2_decoder_t (
_options.in_batch_size, _options.maxmsgsize, _options.zero_copy);
alloc_assert (_decoder);
return zmq::zmtp_engine_t::handshake_v3_x ();
}
int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_)
{
const int rc = msg_->init_size (_options.routing_id_size);

View File

@ -49,7 +49,8 @@ namespace zmq
enum
{
ZMTP_1_0 = 0,
ZMTP_2_0 = 1
ZMTP_2_0 = 1,
ZMTP_3_x = 3
};
class io_thread_t;
@ -85,12 +86,15 @@ class zmtp_engine_t ZMQ_FINAL : public stream_engine_base_t
typedef bool (zmtp_engine_t::*handshake_fun_t) ();
static handshake_fun_t select_handshake_fun (bool unversioned,
unsigned char revision);
unsigned char revision,
unsigned char minor);
bool handshake_v1_0_unversioned ();
bool handshake_v1_0 ();
bool handshake_v2_0 ();
bool handshake_v3_x ();
bool handshake_v3_0 ();
bool handshake_v3_1 ();
int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_);

View File

@ -81,7 +81,7 @@ static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_)
}
}
static void mock_handshake (raw_socket fd_)
static void mock_handshake (raw_socket fd_, bool sub_command, bool mock_pub)
{
const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0,
0x7f, 3, 0, 'N', 'U', 'L', 'L', 0};
@ -89,31 +89,44 @@ static void mock_handshake (raw_socket fd_)
memset (buffer, 0, sizeof (buffer));
memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting));
// Mock ZMTP 3.1 which uses commands
if (sub_command) {
buffer[11] = 1;
}
int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0));
TEST_ASSERT_EQUAL_INT (64, rc);
recv_with_retry (fd_, buffer, 64);
const uint8_t zmtp_ready[27] = {
4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e',
't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'};
if (!mock_pub) {
const uint8_t zmtp_ready[27] = {
4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e',
't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (fd_, (const char *) zmtp_ready, 27, 0));
TEST_ASSERT_EQUAL_INT (27, rc);
} else {
const uint8_t zmtp_ready[28] = {
4, 26, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e',
't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 4, 'X', 'P', 'U', 'B'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (fd_, (const char *) zmtp_ready, 28, 0));
TEST_ASSERT_EQUAL_INT (28, rc);
}
// greeting - XPUB has one extra byte
memset (buffer, 0, sizeof (buffer));
memcpy (buffer, zmtp_ready, 27);
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 27, 0));
TEST_ASSERT_EQUAL_INT (27, rc);
// greeting - XPUB so one extra byte
recv_with_retry (fd_, buffer, 28);
recv_with_retry (fd_, buffer, mock_pub ? 27 : 28);
}
static void prep_server_socket (void **server_out_,
void **mon_out_,
char *endpoint_,
size_t ep_length_)
size_t ep_length_,
int socket_type)
{
// We'll be using this socket in raw mode
void *server = test_context_socket (ZMQ_XPUB);
void *server = test_context_socket (socket_type);
int value = 0;
TEST_ASSERT_SUCCESS_ERRNO (
@ -136,13 +149,14 @@ static void prep_server_socket (void **server_out_,
*mon_out_ = server_mon;
}
static void test_mock_sub (bool sub_command_)
static void test_mock_pub_sub (bool sub_command_, bool mock_pub_)
{
int rc;
char my_endpoint[MAX_SOCKET_STRING];
void *server, *server_mon;
prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING);
prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING,
mock_pub_ ? ZMQ_SUB : ZMQ_XPUB);
struct sockaddr_in ip4addr;
raw_socket s;
@ -161,37 +175,63 @@ static void test_mock_sub (bool sub_command_)
TEST_ASSERT_GREATER_THAN_INT (-1, rc);
// Mock a ZMTP 3 client so we can forcibly try sub commands
mock_handshake (s);
mock_handshake (s, sub_command_, mock_pub_);
// By now everything should report as connected
rc = get_monitor_event (server_mon);
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc);
if (sub_command_) {
const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S',
'C', 'R', 'I', 'B', 'E', 'A'};
rc =
TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 13, 0));
TEST_ASSERT_EQUAL_INT (13, rc);
} else {
const uint8_t sub[4] = {0, 2, 1, 'A'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 4, 0));
char buffer[32];
memset (buffer, 0, sizeof (buffer));
if (mock_pub_) {
rc = zmq_setsockopt (server, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// SUB binds, let its state machine run
zmq_recv (server, buffer, 16, ZMQ_DONTWAIT);
if (sub_command_) {
recv_with_retry (s, buffer, 13);
TEST_ASSERT_EQUAL_INT (0,
memcmp (buffer, "\4\xb\x9SUBSCRIBEA", 13));
} else {
recv_with_retry (s, buffer, 4);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\2\1A", 4));
}
memcpy (buffer, "\0\4ALOL", 6);
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, buffer, 6, 0));
TEST_ASSERT_EQUAL_INT (6, rc);
memset (buffer, 0, sizeof (buffer));
rc = zmq_recv (server, buffer, 4, 0);
TEST_ASSERT_EQUAL_INT (4, rc);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "ALOL", 4));
} else {
if (sub_command_) {
const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S',
'C', 'R', 'I', 'B', 'E', 'A'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (s, (const char *) sub, 13, 0));
TEST_ASSERT_EQUAL_INT (13, rc);
} else {
const uint8_t sub[4] = {0, 2, 1, 'A'};
rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (
send (s, (const char *) sub, 4, 0));
TEST_ASSERT_EQUAL_INT (4, rc);
}
rc = zmq_recv (server, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2));
rc = zmq_send (server, "ALOL", 4, 0);
TEST_ASSERT_EQUAL_INT (4, rc);
memset (buffer, 0, sizeof (buffer));
recv_with_retry (s, buffer, 6);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6));
}
char buffer[16];
memset (buffer, 0, sizeof (buffer));
rc = zmq_recv (server, buffer, 2, 0);
TEST_ASSERT_EQUAL_INT (2, rc);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2));
rc = zmq_send (server, "ALOL", 4, 0);
TEST_ASSERT_EQUAL_INT (4, rc);
memset (buffer, 0, sizeof (buffer));
recv_with_retry (s, buffer, 6);
TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6));
close (s);
test_context_socket_close (server);
@ -200,12 +240,22 @@ static void test_mock_sub (bool sub_command_)
void test_mock_sub_command ()
{
test_mock_sub (true);
test_mock_pub_sub (true, false);
}
void test_mock_sub_legacy ()
{
test_mock_sub (false);
test_mock_pub_sub (false, false);
}
void test_mock_pub_command ()
{
test_mock_pub_sub (true, true);
}
void test_mock_pub_legacy ()
{
test_mock_pub_sub (false, true);
}
int main (void)
@ -216,6 +266,8 @@ int main (void)
RUN_TEST (test_mock_sub_command);
RUN_TEST (test_mock_sub_legacy);
RUN_TEST (test_mock_pub_command);
RUN_TEST (test_mock_pub_legacy);
return UNITY_END ();
}

View File

@ -40,7 +40,7 @@ typedef uint8_t byte;
typedef struct
{
byte signature[10]; // 0xFF 8*0x00 0x7F
byte version[2]; // 0x03 0x00 for ZMTP/3.0
byte version[2]; // 0x03 0x01 for ZMTP/3.1
byte mechanism[20]; // "NULL"
byte as_server;
byte filler[31];
@ -52,7 +52,7 @@ typedef struct
// 8-byte size is set to 1 for backwards compatibility
static zmtp_greeting_t greeting = {
{0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 0}, {'N', 'U', 'L', 'L'}, 0, {0}};
{0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 1}, {'N', 'U', 'L', 'L'}, 0, {0}};
static void test_stream_to_dealer ()
{
@ -135,8 +135,8 @@ static void test_stream_to_dealer ()
}
// First two bytes are major and minor version numbers.
TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.0
TEST_ASSERT_EQUAL_INT (0, buffer[1]);
TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.1
TEST_ASSERT_EQUAL_INT (1, buffer[1]);
// Mechanism is "NULL"
TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2,