0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-29 08:39:42 +08:00

Problem: code duplication in getsockopt/setsockopt

Solution: extracted common code into do_getsockopt/do_setsockopt functions
This commit is contained in:
Simon Giesecke 2018-03-06 11:16:22 +01:00
parent 494c2a71f8
commit 6f967c3a13
6 changed files with 264 additions and 256 deletions

View File

@ -44,6 +44,146 @@
#define BINDDEVSIZ 16
#endif
static int sockopt_invalid ()
{
#if defined(ZMQ_ACT_MILITANT)
zmq_assert (false);
#endif
errno = EINVAL;
return -1;
}
int zmq::do_getsockopt (void *const optval_,
size_t *const optvallen_,
const std::string &value_)
{
return do_getsockopt (optval_, optvallen_, value_.c_str (),
value_.size () + 1);
}
int zmq::do_getsockopt (void *const optval_,
size_t *const optvallen_,
const void *value_,
const size_t value_len_)
{
// TODO behaviour is inconsistent with options_t::getsockopt; there, an
// *exact* length match is required except for string-like (but not the
// CURVE keys!) (and therefore null-ing remaining memory is a no-op, see
// comment below)
if (*optvallen_ < value_len_) {
return sockopt_invalid ();
}
memcpy (optval_, value_, value_len_);
// TODO why is the remaining memory null-ed?
memset ((char *) optval_ + value_len_, 0, *optvallen_ - value_len_);
*optvallen_ = value_len_;
return 0;
}
#ifdef ZMQ_HAVE_CURVE
static int do_getsockopt_curve_key (void *const optval_,
size_t *const optvallen_,
const uint8_t (&curve_key_)[CURVE_KEYSIZE])
{
if (*optvallen_ == CURVE_KEYSIZE) {
memcpy (optval_, curve_key_, CURVE_KEYSIZE);
return 0;
} else if (*optvallen_ == CURVE_KEYSIZE_Z85 + 1) {
zmq_z85_encode ((char *) optval_, curve_key_, CURVE_KEYSIZE);
return 0;
}
return sockopt_invalid ();
}
#endif
template <typename T>
int do_setsockopt (const void *const optval_,
const size_t optvallen_,
T *const out_value_)
{
if (optvallen_ == sizeof (T)) {
memcpy (out_value_, optval_, sizeof (T));
return 0;
}
return sockopt_invalid ();
}
int zmq::do_setsockopt_int_as_bool_strict (const void *const optval_,
const size_t optvallen_,
bool *const out_value_)
{
// TODO handling of values other than 0 or 1 is not consistent,
// here it is disallowed, but for other options such as
// ZMQ_ROUTER_RAW any positive value is accepted
int value;
if (do_setsockopt (optval_, optvallen_, &value) == -1)
return -1;
if (value == 0 || value == 1) {
*out_value_ = (value != 0);
return 0;
}
return sockopt_invalid ();
}
int zmq::do_setsockopt_int_as_bool_relaxed (const void *const optval_,
const size_t optvallen_,
bool *const out_value_)
{
int value;
if (do_setsockopt (optval_, optvallen_, &value) == -1)
return -1;
*out_value_ = (value != 0);
return 0;
}
static int
do_setsockopt_string_allow_empty_strict (const void *const optval_,
const size_t optvallen_,
std::string *const out_value_,
const size_t max_len_)
{
// TODO why is optval_ != NULL not allowed in case of optvallen_== 0?
// TODO why are empty strings allowed for some socket options, but not for others?
if (optval_ == NULL && optvallen_ == 0) {
out_value_->clear ();
return 0;
} else if (optval_ != NULL && optvallen_ > 0 && optvallen_ <= max_len_) {
out_value_->assign ((const char *) optval_, optvallen_);
return 0;
}
return sockopt_invalid ();
}
static int
do_setsockopt_string_allow_empty_relaxed (const void *const optval_,
const size_t optvallen_,
std::string *const out_value_,
const size_t max_len_)
{
// TODO use either do_setsockopt_string_allow_empty_relaxed or
// do_setsockopt_string_allow_empty_strict everywhere
if (optvallen_ > 0 && optvallen_ <= max_len_) {
out_value_->assign ((const char *) optval_, optvallen_);
return 0;
}
return sockopt_invalid ();
}
template <typename T>
int do_setsockopt_set (const void *const optval_,
const size_t optvallen_,
std::set<T> *const set_)
{
if (optvallen_ == 0 && optval_ == NULL) {
set_->clear ();
return 0;
} else if (optvallen_ == sizeof (T) && optval_ != NULL) {
set_->insert (*((const T *) optval_));
return 0;
}
return sockopt_invalid ();
}
zmq::options_t::options_t () :
sndhwm (1000),
rcvhwm (1000),
@ -166,11 +306,7 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_AFFINITY:
if (optvallen_ == sizeof (uint64_t)) {
affinity = *((uint64_t *) optval_);
return 0;
}
break;
return do_setsockopt (optval_, optvallen_, &affinity);
case ZMQ_ROUTING_ID:
// Routing id is any binary string from 1 to 255 octets
@ -259,11 +395,7 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_MAXMSGSIZE:
if (optvallen_ == sizeof (int64_t)) {
maxmsgsize = *((int64_t *) optval_);
return 0;
}
break;
return do_setsockopt (optval_, optvallen_, &maxmsgsize);
case ZMQ_MULTICAST_HOPS:
if (is_int && value > 0) {
@ -294,31 +426,23 @@ int zmq::options_t::setsockopt (int option_,
break;
/* Deprecated in favor of ZMQ_IPV6 */
case ZMQ_IPV4ONLY:
if (is_int && (value == 0 || value == 1)) {
ipv6 = (value == 0);
return 0;
}
break;
case ZMQ_IPV4ONLY: {
bool value;
int rc =
do_setsockopt_int_as_bool_strict (optval_, optvallen_, &value);
if (rc == 0)
ipv6 = !value;
return rc;
}
/* To replace the somewhat surprising IPV4ONLY */
case ZMQ_IPV6:
if (is_int && (value == 0 || value == 1)) {
ipv6 = (value != 0);
return 0;
}
break;
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&ipv6);
case ZMQ_SOCKS_PROXY:
if (optval_ == NULL && optvallen_ == 0) {
socks_proxy_address.clear ();
return 0;
} else if (optval_ != NULL && optvallen_ > 0) {
socks_proxy_address =
std::string ((const char *) optval_, optvallen_);
return 0;
}
break;
return do_setsockopt_string_allow_empty_strict (
optval_, optvallen_, &socks_proxy_address, SIZE_MAX);
case ZMQ_TCP_KEEPALIVE:
if (is_int && (value == -1 || value == 0 || value == 1)) {
@ -349,60 +473,46 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_IMMEDIATE:
// TODO why is immediate not bool (and called non_immediate, as its meaning appears to be reversed)
if (is_int && (value == 0 || value == 1)) {
immediate = value;
return 0;
}
break;
case ZMQ_TCP_ACCEPT_FILTER:
if (optvallen_ == 0 && optval_ == NULL) {
tcp_accept_filters.clear ();
return 0;
} else if (optvallen_ > 0 && optvallen_ < 256 && optval_ != NULL
&& *((const char *) optval_) != 0) {
std::string filter_str ((const char *) optval_, optvallen_);
tcp_address_mask_t mask;
int rc = mask.resolve (filter_str.c_str (), ipv6);
if (rc == 0) {
tcp_accept_filters.push_back (mask);
return 0;
case ZMQ_TCP_ACCEPT_FILTER: {
std::string filter_str;
int rc = do_setsockopt_string_allow_empty_strict (
optval_, optvallen_, &filter_str, 255);
if (rc == 0) {
if (filter_str.empty ()) {
tcp_accept_filters.clear ();
} else {
tcp_address_mask_t mask;
rc = mask.resolve (filter_str.c_str (), ipv6);
if (rc == 0) {
tcp_accept_filters.push_back (mask);
}
}
}
break;
return rc;
}
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
case ZMQ_IPC_FILTER_UID:
if (optvallen_ == 0 && optval_ == NULL) {
ipc_uid_accept_filters.clear ();
return 0;
} else if (optvallen_ == sizeof (uid_t) && optval_ != NULL) {
ipc_uid_accept_filters.insert (*((uid_t *) optval_));
return 0;
}
break;
return do_setsockopt_set (optval_, optvallen_,
&ipc_uid_accept_filters);
case ZMQ_IPC_FILTER_GID:
if (optvallen_ == 0 && optval_ == NULL) {
ipc_gid_accept_filters.clear ();
return 0;
} else if (optvallen_ == sizeof (gid_t) && optval_ != NULL) {
ipc_gid_accept_filters.insert (*((gid_t *) optval_));
return 0;
}
break;
return do_setsockopt_set (optval_, optvallen_,
&ipc_gid_accept_filters);
#endif
#if defined ZMQ_HAVE_SO_PEERCRED
case ZMQ_IPC_FILTER_PID:
if (optvallen_ == 0 && optval_ == NULL) {
ipc_pid_accept_filters.clear ();
return 0;
} else if (optvallen_ == sizeof (pid_t) && optval_ != NULL) {
ipc_pid_accept_filters.insert (*((pid_t *) optval_));
return 0;
}
break;
return do_setsockopt_set (optval_, optvallen_,
&ipc_pid_accept_filters);
#endif
case ZMQ_PLAIN_SERVER:
@ -438,10 +548,8 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_ZAP_DOMAIN:
if (optvallen_ < 256) {
zap_domain.assign ((const char *) optval_, optvallen_);
return 0;
}
return do_setsockopt_string_allow_empty_relaxed (
optval_, optvallen_, &zap_domain, 255);
break;
// If curve encryption isn't built, these options provoke EINVAL
@ -475,11 +583,8 @@ int zmq::options_t::setsockopt (int option_,
#endif
case ZMQ_CONFLATE:
if (is_int && (value == 0 || value == 1)) {
conflate = (value != 0);
return 0;
}
break;
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&conflate);
// If libgssapi isn't installed, these options provoke EINVAL
#ifdef HAVE_LIBGSSAPI_KRB5
@ -510,11 +615,8 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_GSSAPI_PLAINTEXT:
if (is_int && (value == 0 || value == 1)) {
gss_plaintext = (value != 0);
return 0;
}
break;
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&gss_plaintext);
case ZMQ_GSSAPI_PRINCIPAL_NAMETYPE:
if (is_int
@ -545,11 +647,8 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_INVERT_MATCHING:
if (is_int) {
invert_matching = (value != 0);
return 0;
}
break;
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&invert_matching);
case ZMQ_HEARTBEAT_IVL:
if (is_int && value >= 0) {
@ -576,32 +675,16 @@ int zmq::options_t::setsockopt (int option_,
#ifdef ZMQ_HAVE_VMCI
case ZMQ_VMCI_BUFFER_SIZE:
if (optvallen_ == sizeof (uint64_t)) {
vmci_buffer_size = *((uint64_t *) optval_);
return 0;
}
break;
return do_setsockopt (optval_, optvallen_, &vmci_buffer_size);
case ZMQ_VMCI_BUFFER_MIN_SIZE:
if (optvallen_ == sizeof (uint64_t)) {
vmci_buffer_min_size = *((uint64_t *) optval_);
return 0;
}
break;
return do_setsockopt (optval_, optvallen_, &vmci_buffer_min_size);
case ZMQ_VMCI_BUFFER_MAX_SIZE:
if (optvallen_ == sizeof (uint64_t)) {
vmci_buffer_max_size = *((uint64_t *) optval_);
return 0;
}
break;
return do_setsockopt (optval_, optvallen_, &vmci_buffer_max_size);
case ZMQ_VMCI_CONNECT_TIMEOUT:
if (optvallen_ == sizeof (int)) {
vmci_connect_timeout = *((int *) optval_);
return 0;
}
break;
return do_setsockopt (optval_, optvallen_, &vmci_connect_timeout);
#endif
case ZMQ_USE_FD:
@ -612,30 +695,16 @@ int zmq::options_t::setsockopt (int option_,
break;
case ZMQ_BINDTODEVICE:
if (optval_ == NULL && optvallen_ == 0) {
bound_device.clear ();
return 0;
} else if (optval_ != NULL && optvallen_ > 0
&& optvallen_ <= BINDDEVSIZ) {
bound_device = std::string ((const char *) optval_, optvallen_);
return 0;
}
break;
return do_setsockopt_string_allow_empty_strict (
optval_, optvallen_, &bound_device, BINDDEVSIZ);
case ZMQ_ZAP_ENFORCE_DOMAIN:
if (is_int) {
zap_enforce_domain = (value != 0);
return 0;
}
break;
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&zap_enforce_domain);
case ZMQ_LOOPBACK_FASTPATH:
if (is_int) {
loopback_fastpath = (value != 0);
return 0;
}
break;
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&loopback_fastpath);
default:
#if defined(ZMQ_ACT_MILITANT)
@ -647,6 +716,12 @@ int zmq::options_t::setsockopt (int option_,
#endif
break;
}
// TODO mechanism should either be set explicitly, or determined when
// connecting. currently, it depends on the order of setsockopt calls
// if there is some inconsistency, which is confusing. in addition,
// the assumed or set mechanism should be queryable (as a socket option)
#if defined(ZMQ_ACT_MILITANT)
// There is no valid use case for passing an error back to the application
// when it sent malformed arguments to a socket option. Use ./configure
@ -691,11 +766,8 @@ int zmq::options_t::getsockopt (int option_,
break;
case ZMQ_ROUTING_ID:
if (*optvallen_ >= routing_id_size) {
memcpy (optval_, routing_id, routing_id_size);
*optvallen_ = routing_id_size;
return 0;
}
return do_getsockopt (optval_, optvallen_, routing_id,
routing_id_size);
break;
case ZMQ_RATE:
@ -840,12 +912,7 @@ int zmq::options_t::getsockopt (int option_,
break;
case ZMQ_SOCKS_PROXY:
if (*optvallen_ >= socks_proxy_address.size () + 1) {
memcpy (optval_, socks_proxy_address.c_str (),
socks_proxy_address.size () + 1);
*optvallen_ = socks_proxy_address.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, socks_proxy_address);
break;
case ZMQ_TCP_KEEPALIVE:
@ -891,29 +958,15 @@ int zmq::options_t::getsockopt (int option_,
break;
case ZMQ_PLAIN_USERNAME:
if (*optvallen_ >= plain_username.size () + 1) {
memcpy (optval_, plain_username.c_str (),
plain_username.size () + 1);
*optvallen_ = plain_username.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, plain_username);
break;
case ZMQ_PLAIN_PASSWORD:
if (*optvallen_ >= plain_password.size () + 1) {
memcpy (optval_, plain_password.c_str (),
plain_password.size () + 1);
*optvallen_ = plain_password.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, plain_password);
break;
case ZMQ_ZAP_DOMAIN:
if (*optvallen_ >= zap_domain.size () + 1) {
memcpy (optval_, zap_domain.c_str (), zap_domain.size () + 1);
*optvallen_ = zap_domain.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, zap_domain);
break;
// If curve encryption isn't built, these options provoke EINVAL
@ -926,36 +979,18 @@ int zmq::options_t::getsockopt (int option_,
break;
case ZMQ_CURVE_PUBLICKEY:
if (*optvallen_ == CURVE_KEYSIZE) {
memcpy (optval_, curve_public_key, CURVE_KEYSIZE);
return 0;
} else if (*optvallen_ == CURVE_KEYSIZE_Z85 + 1) {
zmq_z85_encode ((char *) optval_, curve_public_key,
CURVE_KEYSIZE);
return 0;
}
return do_getsockopt_curve_key (optval_, optvallen_,
curve_public_key);
break;
case ZMQ_CURVE_SECRETKEY:
if (*optvallen_ == CURVE_KEYSIZE) {
memcpy (optval_, curve_secret_key, CURVE_KEYSIZE);
return 0;
} else if (*optvallen_ == CURVE_KEYSIZE_Z85 + 1) {
zmq_z85_encode ((char *) optval_, curve_secret_key,
CURVE_KEYSIZE);
return 0;
}
return do_getsockopt_curve_key (optval_, optvallen_,
curve_secret_key);
break;
case ZMQ_CURVE_SERVERKEY:
if (*optvallen_ == CURVE_KEYSIZE) {
memcpy (optval_, curve_server_key, CURVE_KEYSIZE);
return 0;
} else if (*optvallen_ == CURVE_KEYSIZE_Z85 + 1) {
zmq_z85_encode ((char *) optval_, curve_server_key,
CURVE_KEYSIZE);
return 0;
}
return do_getsockopt_curve_key (optval_, optvallen_,
curve_server_key);
break;
#endif
@ -976,21 +1011,11 @@ int zmq::options_t::getsockopt (int option_,
break;
case ZMQ_GSSAPI_PRINCIPAL:
if (*optvallen_ >= gss_principal.size () + 1) {
memcpy (optval_, gss_principal.c_str (),
gss_principal.size () + 1);
*optvallen_ = gss_principal.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, gss_principal);
break;
case ZMQ_GSSAPI_SERVICE_PRINCIPAL:
if (*optvallen_ >= gss_service_principal.size () + 1) {
memcpy (optval_, gss_service_principal.c_str (),
gss_service_principal.size () + 1);
*optvallen_ = gss_service_principal.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, gss_service_principal);
break;
case ZMQ_GSSAPI_PLAINTEXT:
@ -1058,12 +1083,7 @@ int zmq::options_t::getsockopt (int option_,
break;
case ZMQ_BINDTODEVICE:
if (*optvallen_ >= bound_device.size () + 1) {
memcpy (optval_, bound_device.c_str (),
bound_device.size () + 1);
*optvallen_ = bound_device.size () + 1;
return 0;
}
return do_getsockopt (optval_, optvallen_, bound_device);
break;
case ZMQ_ZAP_ENFORCE_DOMAIN:

View File

@ -46,6 +46,10 @@
#include <sys/ucred.h>
#endif
#if __cplusplus >= 201103L
#include <type_traits>
#endif
// Normal base 256 key is 32 bytes
#define CURVE_KEYSIZE 32
// Key encoded using Z85 is 40 bytes
@ -255,6 +259,33 @@ struct options_t
// Use zero copy strategy for storing message content when decoding.
bool zero_copy;
};
int do_getsockopt (void *const optval_,
size_t *const optvallen_,
const void *value_,
const size_t value_len_);
template <typename T>
int do_getsockopt (void *const optval_, size_t *const optvallen_, T value_)
{
#if __cplusplus >= 201103L && (!defined(__GNUC__) || __GNUC__ > 5)
static_assert (std::is_trivially_copyable<T>::value,
"invalid use of do_getsockopt");
#endif
return do_getsockopt (optval_, optvallen_, &value_, sizeof (T));
}
int do_getsockopt (void *const optval_,
size_t *const optvallen_,
const std::string &value_);
int do_setsockopt_int_as_bool_strict (const void *const optval_,
const size_t optvallen_,
bool *out_value_);
int do_setsockopt_int_as_bool_relaxed (const void *const optval_,
const size_t optvallen_,
bool *out_value_);
}
#endif

View File

@ -106,6 +106,8 @@ int zmq::router_t::xsetsockopt (int option_,
switch (option_) {
case ZMQ_CONNECT_ROUTING_ID:
// TODO why isn't it possible to set an empty connect_routing_id
// (which is the default value)
if (optval_ && optvallen_) {
connect_routing_id.assign ((char *) optval_, optvallen_);
return 0;

View File

@ -97,7 +97,6 @@
#include "scatter.hpp"
#include "dgram.hpp"
bool zmq::socket_base_t::check_tag ()
{
return tag == 0xbaddecaf;
@ -395,77 +394,41 @@ int zmq::socket_base_t::getsockopt (int option_,
}
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
memset (optval_, 0, *optvallen_);
*((int *) optval_) = rcvmore ? 1 : 0;
*optvallen_ = sizeof (int);
return 0;
return do_getsockopt<int> (optval_, optvallen_, rcvmore ? 1 : 0);
}
if (option_ == ZMQ_FD) {
if (*optvallen_ < sizeof (fd_t)) {
errno = EINVAL;
return -1;
}
if (thread_safe) {
// thread safe socket doesn't provide file descriptor
errno = EINVAL;
return -1;
}
*((fd_t *) optval_) = ((mailbox_t *) mailbox)->get_fd ();
*optvallen_ = sizeof (fd_t);
return 0;
return do_getsockopt<fd_t> (optval_, optvallen_,
((mailbox_t *) mailbox)->get_fd ());
}
if (option_ == ZMQ_EVENTS) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
int rc = process_commands (0, false);
if (rc != 0 && (errno == EINTR || errno == ETERM)) {
return -1;
}
errno_assert (rc == 0);
*((int *) optval_) = 0;
if (has_out ())
*((int *) optval_) |= ZMQ_POLLOUT;
if (has_in ())
*((int *) optval_) |= ZMQ_POLLIN;
*optvallen_ = sizeof (int);
return 0;
return do_getsockopt<int> (optval_, optvallen_,
(has_out () ? ZMQ_POLLOUT : 0)
| (has_in () ? ZMQ_POLLIN : 0));
}
if (option_ == ZMQ_LAST_ENDPOINT) {
if (*optvallen_ < last_endpoint.size () + 1) {
errno = EINVAL;
return -1;
}
strncpy (static_cast<char *> (optval_), last_endpoint.c_str (),
last_endpoint.size () + 1);
*optvallen_ = last_endpoint.size () + 1;
return 0;
return do_getsockopt (optval_, optvallen_, last_endpoint);
}
if (option_ == ZMQ_THREAD_SAFE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
memset (optval_, 0, *optvallen_);
*((int *) optval_) = thread_safe ? 1 : 0;
*optvallen_ = sizeof (int);
return 0;
return do_getsockopt<int> (optval_, optvallen_, thread_safe ? 1 : 0);
}
int rc = options.getsockopt (option_, optval_, optvallen_);
return rc;
return options.getsockopt (option_, optval_, optvallen_);
}
int zmq::socket_base_t::join (const char *group_)

View File

@ -175,13 +175,10 @@ int zmq::stream_t::xsetsockopt (int option_,
const void *optval_,
size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
int value = 0;
if (is_int)
memcpy (&value, optval_, sizeof (int));
switch (option_) {
case ZMQ_CONNECT_ROUTING_ID:
// TODO why isn't it possible to set an empty connect_routing_id
// (which is the default value)
if (optval_ && optvallen_) {
connect_routing_id.assign ((char *) optval_, optvallen_);
return 0;
@ -189,10 +186,8 @@ int zmq::stream_t::xsetsockopt (int option_,
break;
case ZMQ_STREAM_NOTIFY:
if (is_int && (value == 0 || value == 1)) {
options.raw_notify = (value != 0);
return 0;
}
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&options.raw_notify);
break;
default:

View File

@ -59,10 +59,7 @@ int zmq::sub_t::xsetsockopt (int option_,
int rc = msg.init_size (optvallen_ + 1);
errno_assert (rc == 0);
unsigned char *data = (unsigned char *) msg.data ();
if (option_ == ZMQ_SUBSCRIBE)
*data = 1;
else if (option_ == ZMQ_UNSUBSCRIBE)
*data = 0;
*data = (option_ == ZMQ_SUBSCRIBE);
// We explicitly allow a NULL subscription with size zero
if (optvallen_) {
assert (optval_);