0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 09:47:56 +08:00

Merge pull request #3780 from somdoron/WSPINGPONG

problem: ws_engine don't support WS RFC ping-pong and close
This commit is contained in:
Luca Boccassi 2020-01-18 19:31:00 +00:00 committed by GitHub
commit 7ea72e5692
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 184 additions and 8 deletions

View File

@ -454,6 +454,11 @@ bool zmq::msg_t::is_pong () const
return (_u.base.flags & CMD_TYPE_MASK) == pong;
}
bool zmq::msg_t::is_close_cmd () const
{
return (_u.base.flags & CMD_TYPE_MASK) == close_cmd;
}
size_t zmq::msg_t::command_body_size () const
{
if (this->is_ping () || this->is_pong ())

View File

@ -84,6 +84,7 @@ class msg_t
pong = 8,
subscribe = 12,
cancel = 16,
close_cmd = 20,
credential = 32,
routing_id = 64,
shared = 128
@ -126,6 +127,7 @@ class msg_t
bool is_leave () const;
bool is_ping () const;
bool is_pong () const;
bool is_close_cmd () const;
// These are called on each message received by the session_base class,
// so get them inlined to avoid the overhead of 2 function calls per msg

View File

@ -87,7 +87,8 @@ class stream_engine_base_t : public io_object_t, public i_engine
int push_msg_to_session (msg_t *msg_);
int pull_and_encode (msg_t *msg_);
int decode_and_push (msg_t *msg_);
virtual int decode_and_push (msg_t *msg_);
int push_one_then_decode_and_push (msg_t *msg_);
void set_handshake_timer ();
@ -165,7 +166,6 @@ class stream_engine_base_t : public io_object_t, public i_engine
void unplug ();
int write_credential (msg_t *msg_);
int push_one_then_decode_and_push (msg_t *msg_);
void mechanism_ready ();

View File

@ -77,13 +77,13 @@ int zmq::ws_decoder_t::opcode_ready (unsigned char const *)
case zmq::ws_protocol_t::opcode_binary:
break;
case zmq::ws_protocol_t::opcode_close:
_msg_flags = msg_t::command; // TODO: set the command name to CLOSE
_msg_flags = msg_t::command | msg_t::close_cmd;
break;
case zmq::ws_protocol_t::opcode_ping:
_msg_flags = msg_t::ping;
_msg_flags = msg_t::ping | msg_t::command;
break;
case zmq::ws_protocol_t::opcode_pong:
_msg_flags = msg_t::pong;
_msg_flags = msg_t::pong | msg_t::command;
break;
default:
return -1;

View File

@ -55,8 +55,15 @@ void zmq::ws_encoder_t::message_ready ()
{
int offset = 0;
// TODO: it might be close/ping/pong, which should be different op code
_tmp_buf[offset++] = 0x82; // Final | binary
if (in_progress ()->is_ping ())
_tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_ping;
else if (in_progress ()->is_pong ())
_tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_pong;
else if (in_progress ()->is_close_cmd ())
_tmp_buf[offset++] = 0x80 | zmq::ws_protocol_t::opcode_close;
else
_tmp_buf[offset++] = 0x82; // Final | binary
_tmp_buf[offset] = _must_mask ? 0x80 : 0x00;
size_t size = in_progress ()->size ();

View File

@ -122,7 +122,8 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_,
_header_name_position (0),
_header_value_position (0),
_header_upgrade_websocket (false),
_header_connection_upgrade (false)
_header_connection_upgrade (false),
_heartbeat_timeout (0)
{
memset (_websocket_key, 0, MAX_HEADER_VALUE_LENGTH + 1);
memset (_websocket_accept, 0, MAX_HEADER_VALUE_LENGTH + 1);
@ -130,10 +131,18 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_,
_next_msg = &ws_engine_t::next_handshake_command;
_process_msg = &ws_engine_t::process_handshake_command;
_close_msg.init ();
if (_options.heartbeat_interval > 0) {
_heartbeat_timeout = _options.heartbeat_timeout;
if (_heartbeat_timeout == -1)
_heartbeat_timeout = _options.heartbeat_interval;
}
}
zmq::ws_engine_t::~ws_engine_t ()
{
_close_msg.close ();
}
void zmq::ws_engine_t::start_ws_handshake ()
@ -227,6 +236,13 @@ bool zmq::ws_engine_t::select_protocol (char *protocol_)
&ws_engine_t::routing_id_msg);
_process_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::process_routing_id_msg);
// No mechanism in place, enabling heartbeat
if (_options.heartbeat_interval > 0 && !_has_heartbeat_timer) {
add_timer (_options.heartbeat_interval, heartbeat_ivl_timer_id);
_has_heartbeat_timer = true;
}
return true;
}
if (_options.mechanism == ZMQ_NULL
@ -902,6 +918,107 @@ bool zmq::ws_engine_t::client_handshake ()
return false;
}
int zmq::ws_engine_t::decode_and_push (msg_t *msg_)
{
zmq_assert (_mechanism != NULL);
// with WS engine, ping and pong commands are control messages and should not go through any mechanism
if (msg_->is_ping () || msg_->is_pong () || msg_->is_close_cmd ()) {
if (process_command_message (msg_) == -1)
return -1;
} else if (_mechanism->decode (msg_) == -1)
return -1;
if (_has_timeout_timer) {
_has_timeout_timer = false;
cancel_timer (heartbeat_timeout_timer_id);
}
if (msg_->flags () & msg_t::command && !msg_->is_ping ()
&& !msg_->is_pong () && !msg_->is_close_cmd ())
process_command_message (msg_);
if (_metadata)
msg_->set_metadata (_metadata);
if (session ()->push_msg (msg_) == -1) {
if (errno == EAGAIN)
_process_msg = &ws_engine_t::push_one_then_decode_and_push;
return -1;
}
return 0;
}
int zmq::ws_engine_t::produce_close_message (msg_t *msg_)
{
int rc = msg_->move (_close_msg);
errno_assert (rc == 0);
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::produce_no_msg_after_close);
return rc;
}
int zmq::ws_engine_t::produce_no_msg_after_close (msg_t *msg_)
{
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::close_connection_after_close);
errno = EAGAIN;
return -1;
}
int zmq::ws_engine_t::close_connection_after_close (msg_t *msg_)
{
error (connection_error);
errno = ECONNRESET;
return -1;
}
int zmq::ws_engine_t::produce_ping_message (msg_t *msg_)
{
int rc = msg_->init ();
errno_assert (rc == 0);
msg_->set_flags (msg_t::command | msg_t::ping);
_next_msg = &ws_engine_t::pull_and_encode;
if (!_has_timeout_timer && _heartbeat_timeout > 0) {
add_timer (_heartbeat_timeout, heartbeat_timeout_timer_id);
_has_timeout_timer = true;
}
return rc;
}
int zmq::ws_engine_t::produce_pong_message (msg_t *msg_)
{
int rc = msg_->init ();
errno_assert (rc == 0);
msg_->set_flags (msg_t::command | msg_t::pong);
_next_msg = &ws_engine_t::pull_and_encode;
return rc;
}
int zmq::ws_engine_t::process_command_message (msg_t *msg_)
{
if (msg_->is_ping ()) {
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::produce_pong_message);
out_event ();
} else if (msg_->is_close_cmd ()) {
int rc = _close_msg.copy (*msg_);
errno_assert (rc == 0);
_next_msg = static_cast<int (stream_engine_base_t::*) (msg_t *)> (
&ws_engine_t::produce_close_message);
out_event ();
}
return 0;
}
static int
encode_base64 (const unsigned char *in_, int in_len_, char *out_, int out_len_)
{

View File

@ -135,6 +135,10 @@ class ws_engine_t : public stream_engine_base_t
~ws_engine_t ();
protected:
int decode_and_push (msg_t *msg_);
int process_command_message (msg_t *msg_);
int produce_pong_message (msg_t *msg_);
int produce_ping_message (msg_t *msg_);
bool handshake ();
void plug_internal ();
void start_ws_handshake ();
@ -142,6 +146,9 @@ class ws_engine_t : public stream_engine_base_t
private:
int routing_id_msg (msg_t *msg_);
int process_routing_id_msg (msg_t *msg_);
int produce_close_message (msg_t *msg_);
int produce_no_msg_after_close (msg_t *msg_);
int close_connection_after_close (msg_t *msg_);
bool select_protocol (char *protocol);
@ -166,6 +173,9 @@ class ws_engine_t : public stream_engine_base_t
char _websocket_protocol[256];
char _websocket_key[MAX_HEADER_VALUE_LENGTH + 1];
char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1];
int _heartbeat_timeout;
msg_t _close_msg;
};
}

View File

@ -52,6 +52,40 @@ void test_roundtrip ()
test_context_socket_close (sb);
}
void test_heartbeat ()
{
char connect_address[MAX_SOCKET_STRING + strlen ("/heartbeat")];
size_t addr_length = sizeof (connect_address);
void *sb = test_context_socket (ZMQ_REP);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:*/heartbeat"));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, connect_address, &addr_length));
strcat (connect_address, "/heartbeat");
void *sc = test_context_socket (ZMQ_REQ);
// Setting heartbeat settings
int ivl = 10;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sc, ZMQ_HEARTBEAT_IVL, &ivl, sizeof (ivl)));
// Disable reconnect, to make sure the ping-pong actually work
ivl = -1;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sc, ZMQ_RECONNECT_IVL, &ivl, sizeof (ivl)));
// Connect to server
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, connect_address));
// Make sure some ping and pong going through
msleep (100);
bounce (sb, sc);
test_context_socket_close (sc);
test_context_socket_close (sb);
}
void test_short_message ()
{
char connect_address[MAX_SOCKET_STRING + strlen ("/short")];
@ -170,6 +204,7 @@ int main ()
RUN_TEST (test_roundtrip);
RUN_TEST (test_short_message);
RUN_TEST (test_large_message);
RUN_TEST (test_heartbeat);
if (zmq_has ("curve"))
RUN_TEST (test_curve);