0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Fail ZMQ_SNDHWM and ZMQ_RCVHWM setsockopt if already connected.

This commit is contained in:
Richard Newton 2015-06-05 10:43:40 +01:00
parent 64e711eb50
commit 9f8b8af926
5 changed files with 70 additions and 8 deletions

View File

@ -635,7 +635,7 @@ check_PROGRAMS = ${test_apps}
# Run the test cases
TESTS = $(test_apps)
XFAIL_TESTS = tests/test_socketopt_hwm
XFAIL_TESTS =
if !ON_LINUX
XFAIL_TESTS += tests/test_abstract_ipc

View File

@ -68,7 +68,8 @@ zmq::options_t::options_t () :
gss_plaintext (false),
socket_id (0),
conflate (false),
handshake_ivl (30000)
handshake_ivl (30000),
connected (false)
{
}
@ -539,7 +540,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return -1;
}
int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) const
{
bool is_int = (*optvallen_ == sizeof (int));
int *value = (int *) optval_;
@ -884,3 +885,21 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
errno = EINVAL;
return -1;
}
bool zmq::options_t::is_valid (int option_) const
{
bool valid = true;
if (connected) {
switch (option_) {
case ZMQ_SNDHWM:
case ZMQ_RCVHWM:
valid = false;
break;
default:
break;
}
}
return valid;
}

View File

@ -55,7 +55,9 @@ namespace zmq
options_t ();
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_) const;
bool is_valid (int option_) const;
// High-water marks for message pipes.
int sndhwm;
@ -195,6 +197,7 @@ namespace zmq
// close socket. Default is 30 secs. 0 means no handshake timeout.
int handshake_ivl;
bool connected;
};
}

View File

@ -312,6 +312,13 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
{
ENTER_MUTEX();
if (!options.is_valid(option_)) {
errno = EINVAL;
EXIT_MUTEX();
return -1;
}
if (unlikely (ctx_terminated)) {
errno = ETERM;
EXIT_MUTEX();
@ -447,6 +454,7 @@ int zmq::socket_base_t::bind (const char *addr_)
if (rc == 0) {
connect_pending (addr_, this);
last_endpoint.assign (addr_);
options.connected = true;
}
EXIT_MUTEX();
return rc;
@ -456,7 +464,10 @@ int zmq::socket_base_t::bind (const char *addr_)
// For convenience's sake, bind can be used interchageable with
// connect for PGM, EPGM and NORM transports.
EXIT_MUTEX();
return connect (addr_);
rc = connect (addr_);
if (rc != -1)
options.connected = true;
return rc;
}
// Remaining trasnports require to be run in an I/O thread, so at this
@ -484,6 +495,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX();
return 0;
}
@ -505,6 +517,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint);
add_endpoint (last_endpoint.c_str (), (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX();
return 0;
}
@ -526,6 +539,7 @@ int zmq::socket_base_t::bind (const char *addr_)
listener->get_address (last_endpoint);
add_endpoint (addr_, (own_t *) listener, NULL);
options.connected = true;
EXIT_MUTEX();
return 0;
}
@ -657,6 +671,7 @@ int zmq::socket_base_t::connect (const char *addr_)
// remember inproc connections for disconnect
inprocs.insert (inprocs_t::value_type (std::string (addr_), new_pipes [0]));
options.connected = true;
EXIT_MUTEX();
return 0;
}

View File

@ -28,7 +28,7 @@ void test_valid_hwm_change()
* Test that zmq_setsockopt() fails to change the RCVHWM when called
* after a call to zmq_bind().
*/
void test_invalid_hwm_change()
void test_invalid_hwm_change_bind()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
@ -41,12 +41,37 @@ void test_invalid_hwm_change()
assert (rc == 0);
int val = 500;
rc = zmq_setsockopt(bind_socket, ZMQ_RCVHWM, &val, sizeof(val));
rc = zmq_setsockopt (bind_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert (rc == -1);
zmq_close (bind_socket);
zmq_ctx_term (ctx);
}
void test_invalid_hwm_change_connect()
{
void *ctx = zmq_ctx_new();
assert(ctx);
int rc;
void *connect_socket = zmq_socket (ctx, ZMQ_SUB);
assert(connect_socket);
rc = zmq_connect (connect_socket, "inproc://a");
assert(rc == 0);
int val = 500;
rc = zmq_setsockopt (connect_socket, ZMQ_RCVHWM, &val, sizeof(val));
assert(rc == -1);
zmq_close (connect_socket);
zmq_ctx_term (ctx);
}
int main()
{
test_valid_hwm_change();
test_invalid_hwm_change();
test_invalid_hwm_change_bind();
test_invalid_hwm_change_connect();
}