diff --git a/.gitignore b/.gitignore index b54d75f7..f21c5e17 100644 --- a/.gitignore +++ b/.gitignore @@ -152,6 +152,7 @@ test_socket_null test_xpub_verbose test_mock_pub_sub test_proxy_hwm +test_ws_transport unittest_ip_resolver unittest_mtrie unittest_poller diff --git a/CMakeLists.txt b/CMakeLists.txt index c6a7d930..ecd726aa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -221,6 +221,8 @@ else() message(FATAL_ERROR "Invalid polling method") endif() +list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/external/sha1/sha1.c ${CMAKE_CURRENT_SOURCE_DIR}/external/sha1/sha1.h) + if(POLLER STREQUAL "epoll" AND WIN32) message(STATUS "Including wepoll") list(APPEND sources ${CMAKE_CURRENT_SOURCE_DIR}/external/wepoll/wepoll.c ${CMAKE_CURRENT_SOURCE_DIR}/external/wepoll/wepoll.h) @@ -780,6 +782,11 @@ set(cxx-sources gather.cpp ip_resolver.cpp zap_client.cpp + ws_connecter.cpp + ws_decoder.cpp + ws_encoder.cpp + ws_engine.cpp + ws_listener.cpp # at least for VS, the header files must also be listed address.hpp array.hpp @@ -910,6 +917,12 @@ set(cxx-sources vmci_listener.hpp windows.hpp wire.hpp + ws_connecter.hpp + ws_decoder.hpp + ws_encoder.hpp + ws_engine.hpp + ws_listener.hpp + ws_protocol.hpp xpub.hpp xsub.hpp ypipe.hpp diff --git a/Makefile.am b/Makefile.am index 2ac8999a..033cb39a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -247,6 +247,17 @@ src_libzmq_la_SOURCES = \ src/vmci_listener.hpp \ src/windows.hpp \ src/wire.hpp \ + src/ws_connecter.cpp \ + src/ws_connecter.hpp \ + src/ws_decoder.cpp \ + src/ws_decoder.hpp \ + src/ws_encoder.cpp \ + src/ws_encoder.hpp \ + src/ws_engine.cpp \ + src/ws_engine.hpp \ + src/ws_listener.cpp \ + src/ws_listener.hpp \ + src/ws_protocol.hpp \ src/xpub.cpp \ src/xpub.hpp \ src/xsub.cpp \ @@ -263,7 +274,9 @@ src_libzmq_la_SOURCES = \ src/socket_poller.hpp \ src/zap_client.cpp \ src/zap_client.hpp \ - src/zmq_draft.h + src/zmq_draft.h \ + external/sha1/sha1.c \ + external/sha1/sha1.h if USE_WEPOLL src_libzmq_la_SOURCES += \ @@ -459,7 +472,8 @@ test_apps = \ tests/test_sodium \ tests/test_reconnect_ivl \ tests/test_mock_pub_sub \ - tests/test_socket_null + tests/test_socket_null \ + tests/test_ws_transport UNITY_CPPFLAGS = -I$(top_srcdir)/external/unity -DUNITY_USE_COMMAND_LINE_ARGS -DUNITY_EXCLUDE_FLOAT UNITY_LIBS = $(top_builddir)/external/unity/libunity.a @@ -774,6 +788,10 @@ tests_test_mock_pub_sub_SOURCES = tests/test_mock_pub_sub.cpp tests_test_mock_pub_sub_LDADD = src/libzmq.la ${TESTUTIL_LIBS} tests_test_mock_pub_sub_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_ws_transport_SOURCES = tests/test_ws_transport.cpp +tests_test_ws_transport_LDADD = src/libzmq.la ${TESTUTIL_LIBS} +tests_test_ws_transport_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + if HAVE_CURVE test_apps += \ diff --git a/external/sha1/sha1.c b/external/sha1/sha1.c new file mode 100644 index 00000000..2702c89a --- /dev/null +++ b/external/sha1/sha1.c @@ -0,0 +1,336 @@ +/* + * Copyright (C) 1995, 1996, 1997, and 1998 WIDE Project. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the project nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +/* + * FIPS pub 180-1: Secure Hash Algorithm (SHA-1) + * based on: http://www.itl.nist.gov/fipspubs/fip180-1.htm + * implemented by Jun-ichiro itojun Itoh + */ + +#include "sha1.h" +#include + + +/* constant table */ +static uint32_t _K[] = {0x5a827999, 0x6ed9eba1, 0x8f1bbcdc, 0xca62c1d6}; + +#define K(t) _K[(t) / 20] + +#define F0(b, c, d) (((b) & (c)) | ((~(b)) & (d))) +#define F1(b, c, d) (((b) ^ (c)) ^ (d)) +#define F2(b, c, d) (((b) & (c)) | ((b) & (d)) | ((c) & (d))) +#define F3(b, c, d) (((b) ^ (c)) ^ (d)) + +#define S(n, x) (((x) << (n)) | ((x) >> (32 - (n)))) + +#define H(n) (ctxt->h.b32[(n)]) +#define COUNT (ctxt->count) +#define BCOUNT (ctxt->c.b64[0] / 8) +#define W(n) (ctxt->m.b32[(n)]) + +#define PUTBYTE(x) \ +do { \ + ctxt->m.b8[(COUNT % 64)] = (x); \ + COUNT++; \ + COUNT %= 64; \ + ctxt->c.b64[0] += 8; \ + if (COUNT % 64 == 0) \ + sha1_step(ctxt); \ +} while (0) + +#define PUTPAD(x) \ +do { \ + ctxt->m.b8[(COUNT % 64)] = (x); \ + COUNT++; \ + COUNT %= 64; \ + if (COUNT % 64 == 0) \ + sha1_step(ctxt); \ +} while (0) + +static void sha1_step(struct sha1_ctxt *); + +static void +sha1_step(struct sha1_ctxt * ctxt) +{ + uint32_t a, + b, + c, + d, + e; + size_t t, + s; + uint32_t tmp; + +#ifndef WORDS_BIGENDIAN + struct sha1_ctxt tctxt; + + memmove(&tctxt.m.b8[0], &ctxt->m.b8[0], 64); + ctxt->m.b8[0] = tctxt.m.b8[3]; + ctxt->m.b8[1] = tctxt.m.b8[2]; + ctxt->m.b8[2] = tctxt.m.b8[1]; + ctxt->m.b8[3] = tctxt.m.b8[0]; + ctxt->m.b8[4] = tctxt.m.b8[7]; + ctxt->m.b8[5] = tctxt.m.b8[6]; + ctxt->m.b8[6] = tctxt.m.b8[5]; + ctxt->m.b8[7] = tctxt.m.b8[4]; + ctxt->m.b8[8] = tctxt.m.b8[11]; + ctxt->m.b8[9] = tctxt.m.b8[10]; + ctxt->m.b8[10] = tctxt.m.b8[9]; + ctxt->m.b8[11] = tctxt.m.b8[8]; + ctxt->m.b8[12] = tctxt.m.b8[15]; + ctxt->m.b8[13] = tctxt.m.b8[14]; + ctxt->m.b8[14] = tctxt.m.b8[13]; + ctxt->m.b8[15] = tctxt.m.b8[12]; + ctxt->m.b8[16] = tctxt.m.b8[19]; + ctxt->m.b8[17] = tctxt.m.b8[18]; + ctxt->m.b8[18] = tctxt.m.b8[17]; + ctxt->m.b8[19] = tctxt.m.b8[16]; + ctxt->m.b8[20] = tctxt.m.b8[23]; + ctxt->m.b8[21] = tctxt.m.b8[22]; + ctxt->m.b8[22] = tctxt.m.b8[21]; + ctxt->m.b8[23] = tctxt.m.b8[20]; + ctxt->m.b8[24] = tctxt.m.b8[27]; + ctxt->m.b8[25] = tctxt.m.b8[26]; + ctxt->m.b8[26] = tctxt.m.b8[25]; + ctxt->m.b8[27] = tctxt.m.b8[24]; + ctxt->m.b8[28] = tctxt.m.b8[31]; + ctxt->m.b8[29] = tctxt.m.b8[30]; + ctxt->m.b8[30] = tctxt.m.b8[29]; + ctxt->m.b8[31] = tctxt.m.b8[28]; + ctxt->m.b8[32] = tctxt.m.b8[35]; + ctxt->m.b8[33] = tctxt.m.b8[34]; + ctxt->m.b8[34] = tctxt.m.b8[33]; + ctxt->m.b8[35] = tctxt.m.b8[32]; + ctxt->m.b8[36] = tctxt.m.b8[39]; + ctxt->m.b8[37] = tctxt.m.b8[38]; + ctxt->m.b8[38] = tctxt.m.b8[37]; + ctxt->m.b8[39] = tctxt.m.b8[36]; + ctxt->m.b8[40] = tctxt.m.b8[43]; + ctxt->m.b8[41] = tctxt.m.b8[42]; + ctxt->m.b8[42] = tctxt.m.b8[41]; + ctxt->m.b8[43] = tctxt.m.b8[40]; + ctxt->m.b8[44] = tctxt.m.b8[47]; + ctxt->m.b8[45] = tctxt.m.b8[46]; + ctxt->m.b8[46] = tctxt.m.b8[45]; + ctxt->m.b8[47] = tctxt.m.b8[44]; + ctxt->m.b8[48] = tctxt.m.b8[51]; + ctxt->m.b8[49] = tctxt.m.b8[50]; + ctxt->m.b8[50] = tctxt.m.b8[49]; + ctxt->m.b8[51] = tctxt.m.b8[48]; + ctxt->m.b8[52] = tctxt.m.b8[55]; + ctxt->m.b8[53] = tctxt.m.b8[54]; + ctxt->m.b8[54] = tctxt.m.b8[53]; + ctxt->m.b8[55] = tctxt.m.b8[52]; + ctxt->m.b8[56] = tctxt.m.b8[59]; + ctxt->m.b8[57] = tctxt.m.b8[58]; + ctxt->m.b8[58] = tctxt.m.b8[57]; + ctxt->m.b8[59] = tctxt.m.b8[56]; + ctxt->m.b8[60] = tctxt.m.b8[63]; + ctxt->m.b8[61] = tctxt.m.b8[62]; + ctxt->m.b8[62] = tctxt.m.b8[61]; + ctxt->m.b8[63] = tctxt.m.b8[60]; +#endif + + a = H(0); + b = H(1); + c = H(2); + d = H(3); + e = H(4); + + for (t = 0; t < 20; t++) + { + s = t & 0x0f; + if (t >= 16) + W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F0(b, c, d) + e + W(s) + K(t); + e = d; + d = c; + c = S(30, b); + b = a; + a = tmp; + } + for (t = 20; t < 40; t++) + { + s = t & 0x0f; + W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F1(b, c, d) + e + W(s) + K(t); + e = d; + d = c; + c = S(30, b); + b = a; + a = tmp; + } + for (t = 40; t < 60; t++) + { + s = t & 0x0f; + W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F2(b, c, d) + e + W(s) + K(t); + e = d; + d = c; + c = S(30, b); + b = a; + a = tmp; + } + for (t = 60; t < 80; t++) + { + s = t & 0x0f; + W(s) = S(1, W((s + 13) & 0x0f) ^ W((s + 8) & 0x0f) ^ W((s + 2) & 0x0f) ^ W(s)); + tmp = S(5, a) + F3(b, c, d) + e + W(s) + K(t); + e = d; + d = c; + c = S(30, b); + b = a; + a = tmp; + } + + H(0) = H(0) + a; + H(1) = H(1) + b; + H(2) = H(2) + c; + H(3) = H(3) + d; + H(4) = H(4) + e; + + memset(&ctxt->m.b8[0], 0, 64); +} + +/*------------------------------------------------------------*/ + +void +sha1_init(struct sha1_ctxt * ctxt) +{ + memset(ctxt, 0, sizeof(struct sha1_ctxt)); + H(0) = 0x67452301; + H(1) = 0xefcdab89; + H(2) = 0x98badcfe; + H(3) = 0x10325476; + H(4) = 0xc3d2e1f0; +} + +void +sha1_pad(struct sha1_ctxt * ctxt) +{ + size_t padlen; /* pad length in bytes */ + size_t padstart; + + PUTPAD(0x80); + + padstart = COUNT % 64; + padlen = 64 - padstart; + if (padlen < 8) + { + memset(&ctxt->m.b8[padstart], 0, padlen); + COUNT += (uint8_t) padlen; + COUNT %= 64; + sha1_step(ctxt); + padstart = COUNT % 64; /* should be 0 */ + padlen = 64 - padstart; /* should be 64 */ + } + memset(&ctxt->m.b8[padstart], 0, padlen - 8); + COUNT += ((uint8_t) padlen - 8); + COUNT %= 64; +#ifdef WORDS_BIGENDIAN + PUTPAD(ctxt->c.b8[0]); + PUTPAD(ctxt->c.b8[1]); + PUTPAD(ctxt->c.b8[2]); + PUTPAD(ctxt->c.b8[3]); + PUTPAD(ctxt->c.b8[4]); + PUTPAD(ctxt->c.b8[5]); + PUTPAD(ctxt->c.b8[6]); + PUTPAD(ctxt->c.b8[7]); +#else + PUTPAD(ctxt->c.b8[7]); + PUTPAD(ctxt->c.b8[6]); + PUTPAD(ctxt->c.b8[5]); + PUTPAD(ctxt->c.b8[4]); + PUTPAD(ctxt->c.b8[3]); + PUTPAD(ctxt->c.b8[2]); + PUTPAD(ctxt->c.b8[1]); + PUTPAD(ctxt->c.b8[0]); +#endif +} + +void +sha1_loop(struct sha1_ctxt * ctxt, const uint8_t *input0, size_t len) +{ + const uint8_t *input; + size_t gaplen; + size_t gapstart; + size_t off; + size_t copysiz; + + input = (const uint8_t *) input0; + off = 0; + + while (off < len) + { + gapstart = COUNT % 64; + gaplen = 64 - gapstart; + + copysiz = (gaplen < len - off) ? gaplen : len - off; + memmove(&ctxt->m.b8[gapstart], &input[off], copysiz); + COUNT += (uint8_t) copysiz; + COUNT %= 64; + ctxt->c.b64[0] += copysiz * 8; + if (COUNT % 64 == 0) + sha1_step(ctxt); + off += copysiz; + } +} + +void +sha1_result(struct sha1_ctxt * ctxt, uint8_t *digest0) +{ + uint8_t *digest; + + digest = (uint8_t *) digest0; + sha1_pad(ctxt); +#ifdef WORDS_BIGENDIAN + memmove(digest, &ctxt->h.b8[0], 20); +#else + digest[0] = ctxt->h.b8[3]; + digest[1] = ctxt->h.b8[2]; + digest[2] = ctxt->h.b8[1]; + digest[3] = ctxt->h.b8[0]; + digest[4] = ctxt->h.b8[7]; + digest[5] = ctxt->h.b8[6]; + digest[6] = ctxt->h.b8[5]; + digest[7] = ctxt->h.b8[4]; + digest[8] = ctxt->h.b8[11]; + digest[9] = ctxt->h.b8[10]; + digest[10] = ctxt->h.b8[9]; + digest[11] = ctxt->h.b8[8]; + digest[12] = ctxt->h.b8[15]; + digest[13] = ctxt->h.b8[14]; + digest[14] = ctxt->h.b8[13]; + digest[15] = ctxt->h.b8[12]; + digest[16] = ctxt->h.b8[19]; + digest[17] = ctxt->h.b8[18]; + digest[18] = ctxt->h.b8[17]; + digest[19] = ctxt->h.b8[16]; +#endif +} \ No newline at end of file diff --git a/external/sha1/sha1.h b/external/sha1/sha1.h new file mode 100644 index 00000000..7354d133 --- /dev/null +++ b/external/sha1/sha1.h @@ -0,0 +1,87 @@ +/* contrib/pgcrypto/sha1.h */ +/* $KAME: sha1.h,v 1.4 2000/02/22 14:01:18 itojun Exp $ */ + +/* + * Copyright (C) 1995, 1996, 1997, and 1998 WIDE Project. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the project nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE PROJECT OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +/* + * FIPS pub 180-1: Secure Hash Algorithm (SHA-1) + * THIS SOFTWARE IS PROVIDED BY THE PROJECT AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * based on: http://www.itl.nist.gov/fipspubs/fip180-1.htm + * implemented by Jun-ichiro itojun Itoh + */ + +#ifndef _NETINET6_SHA1_H_ +#define _NETINET6_SHA1_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include "../../src/stdint.hpp" + +struct sha1_ctxt +{ + union + { + uint8_t b8[20]; + uint32_t b32[5]; + } h; + union + { + uint8_t b8[8]; + uint64_t b64[1]; + } c; + union + { + uint8_t b8[64]; + uint32_t b32[16]; + } m; + uint8_t count; +}; + +void sha1_init(struct sha1_ctxt *); +void sha1_pad(struct sha1_ctxt *); +void sha1_loop(struct sha1_ctxt *, const uint8_t *, size_t); +void sha1_result(struct sha1_ctxt *, uint8_t *); + +// Compatibility with OpenSSL API +#define SHA_DIGEST_LENGTH 20 +typedef struct sha1_ctxt SHA_CTX; + +#define SHA1_Init(x) sha1_init((x)) +#define SHA1_Update(x, y, z) sha1_loop((x), (y), (z)) +#define SHA1_Final(x, y) sha1_result((y), (x)) + +#define SHA1_RESULTLEN (160/8) + +#ifdef __cplusplus +} +#endif + +#endif /* _NETINET6_SHA1_H_ */ diff --git a/include/zmq.h b/include/zmq.h index 72b12b54..9f889111 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -475,6 +475,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (const zmq_msg_t *msg_, #define ZMQ_PROTOCOL_ERROR_ZAP_BAD_VERSION 0x20000003 #define ZMQ_PROTOCOL_ERROR_ZAP_INVALID_STATUS_CODE 0x20000004 #define ZMQ_PROTOCOL_ERROR_ZAP_INVALID_METADATA 0x20000005 +#define ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED 0x30000000 ZMQ_EXPORT void *zmq_socket (void *, int type_); ZMQ_EXPORT int zmq_close (void *s_); diff --git a/src/address.cpp b/src/address.cpp index 021686d6..75aeb516 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -56,7 +56,7 @@ zmq::address_t::address_t (const std::string &protocol_, zmq::address_t::~address_t () { - if (protocol == protocol_name::tcp) { + if (protocol == protocol_name::tcp || protocol == protocol_name::ws) { LIBZMQ_DELETE (resolved.tcp_addr); } else if (protocol == protocol_name::udp) { LIBZMQ_DELETE (resolved.udp_addr); diff --git a/src/address.hpp b/src/address.hpp index b20a10e5..090ce1f6 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -60,6 +60,7 @@ namespace protocol_name static const char inproc[] = "inproc"; static const char tcp[] = "tcp"; static const char udp[] = "udp"; +static const char ws[] = "ws"; #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ && !defined ZMQ_HAVE_VXWORKS static const char ipc[] = "ipc"; diff --git a/src/session_base.cpp b/src/session_base.cpp index a6d8bce4..a947599a 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -35,6 +35,7 @@ #include "pipe.hpp" #include "likely.hpp" #include "tcp_connecter.hpp" +#include "ws_connecter.hpp" #include "ipc_connecter.hpp" #include "tipc_connecter.hpp" #include "socks_connecter.hpp" @@ -559,6 +560,8 @@ zmq::session_base_t::connecter_factory_entry_t zmq::session_base_t::_connecter_factories[] = { connecter_factory_entry_t (protocol_name::tcp, &zmq::session_base_t::create_connecter_tcp), + connecter_factory_entry_t (protocol_name::ws, + &zmq::session_base_t::create_connecter_ws), #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ && !defined ZMQ_HAVE_VXWORKS connecter_factory_entry_t (protocol_name::ipc, @@ -681,6 +684,13 @@ zmq::own_t *zmq::session_base_t::create_connecter_tcp (io_thread_t *io_thread_, tcp_connecter_t (io_thread_, this, options, _addr, wait_); } +zmq::own_t *zmq::session_base_t::create_connecter_ws (io_thread_t *io_thread_, + bool wait_) +{ + return new (std::nothrow) + ws_connecter_t (io_thread_, this, options, _addr, wait_); +} + #ifdef ZMQ_HAVE_OPENPGM void zmq::session_base_t::start_connecting_pgm (io_thread_t *io_thread_) { diff --git a/src/session_base.hpp b/src/session_base.hpp index c72161ce..6d3abb63 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -118,6 +118,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events own_t *create_connecter_tipc (io_thread_t *io_thread_, bool wait_); own_t *create_connecter_ipc (io_thread_t *io_thread_, bool wait_); own_t *create_connecter_tcp (io_thread_t *io_thread_, bool wait_); + own_t *create_connecter_ws (io_thread_t *io_thread_, bool wait_); typedef void (session_base_t::*start_connecting_fun_t) ( io_thread_t *io_thread); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index e1b2fae7..993a2e23 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -50,6 +50,7 @@ #include "socket_base.hpp" #include "tcp_listener.hpp" +#include "ws_listener.hpp" #include "ipc_listener.hpp" #include "tipc_listener.hpp" #include "tcp_connecter.hpp" @@ -333,6 +334,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) const && protocol_ != protocol_name::ipc #endif && protocol_ != protocol_name::tcp + && protocol_ != protocol_name::ws #if defined ZMQ_HAVE_OPENPGM // pgm/epgm transports only available if 0MQ is compiled with OpenPGM. && protocol_ != "pgm" @@ -629,6 +631,27 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_) return 0; } + if (protocol == protocol_name::ws) { + ws_listener_t *listener = + new (std::nothrow) ws_listener_t (io_thread, this, options); + alloc_assert (listener); + rc = listener->set_local_address (address.c_str ()); + if (rc != 0) { + LIBZMQ_DELETE (listener); + event_bind_failed (make_unconnected_bind_endpoint_pair (address), + zmq_errno ()); + return -1; + } + + // Save last endpoint URI + listener->get_local_address (_last_endpoint); + + add_endpoint (make_unconnected_bind_endpoint_pair (_last_endpoint), + static_cast (listener), NULL); + options.connected = true; + return 0; + } + #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ && !defined ZMQ_HAVE_VXWORKS if (protocol == protocol_name::ipc) { @@ -865,6 +888,47 @@ int zmq::socket_base_t::connect (const char *endpoint_uri_) } // Defer resolution until a socket is opened paddr->resolved.tcp_addr = NULL; + } else if (protocol == protocol_name::ws) { + // Do some basic sanity checks on ws:// address syntax + // - hostname starts with digit or letter, with embedded '-' or '.' + // - IPv6 address may contain hex chars and colons. + // - IPv6 link local address may contain % followed by interface name / zone_id + // (Reference: https://tools.ietf.org/html/rfc4007) + // - IPv4 address may contain decimal digits and dots. + // - Address must end in ":port" where port is *, or numeric + // - Address may contain two parts separated by ':' + // Following code is quick and dirty check to catch obvious errors, + // without trying to be fully accurate. + const char *check = address.c_str (); + if (isalnum (*check) || isxdigit (*check) || *check == '[' + || *check == ':') { + check++; + while (isalnum (*check) || isxdigit (*check) || *check == '.' + || *check == '-' || *check == ':' || *check == '%' + || *check == ';' || *check == '[' || *check == ']' + || *check == '_' || *check == '*') { + check++; + } + } + // Assume the worst, now look for success + rc = -1; + // Did we reach the end of the address safely? + if (*check == 0) { + // Do we have a valid port string? (cannot be '*' in connect + check = strrchr (address.c_str (), ':'); + if (check) { + check++; + if (*check && (isdigit (*check))) + rc = 0; // Valid + } + } + if (rc == -1) { + errno = EINVAL; + LIBZMQ_DELETE (paddr); + return -1; + } + // Defer resolution until a socket is opened + paddr->resolved.tcp_addr = NULL; } #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS \ && !defined ZMQ_HAVE_VXWORKS diff --git a/src/stream_connecter_base.cpp b/src/stream_connecter_base.cpp index 64bdef13..a9b2aa42 100644 --- a/src/stream_connecter_base.cpp +++ b/src/stream_connecter_base.cpp @@ -53,9 +53,9 @@ zmq::stream_connecter_base_t::stream_connecter_base_t ( _s (retired_fd), _handle (static_cast (NULL)), _socket (session_->get_socket ()), + _session (session_), _delayed_start (delayed_start_), _reconnect_timer_started (false), - _session (session_), _current_reconnect_ivl (options.reconnect_ivl) { zmq_assert (_addr); diff --git a/src/stream_connecter_base.hpp b/src/stream_connecter_base.hpp index 3e28de2f..977896b8 100644 --- a/src/stream_connecter_base.hpp +++ b/src/stream_connecter_base.hpp @@ -63,7 +63,7 @@ class stream_connecter_base_t : public own_t, public io_object_t void timer_event (int id_); // Internal function to create the engine after connection was established. - void create_engine (fd_t fd, const std::string &local_address_); + virtual void create_engine (fd_t fd, const std::string &local_address_); // Internal function to add a reconnect timer void add_reconnect_timer (); @@ -91,6 +91,9 @@ class stream_connecter_base_t : public own_t, public io_object_t // Socket zmq::socket_base_t *const _socket; + // Reference to the session we belong to. + zmq::session_base_t *const _session; + private: // ID of the timer used to delay the reconnection. enum @@ -111,9 +114,6 @@ class stream_connecter_base_t : public own_t, public io_object_t // True iff a timer has been started. bool _reconnect_timer_started; - // Reference to the session we belong to. - zmq::session_base_t *const _session; - // Current reconnect ivl, updated for backoff strategy int _current_reconnect_ivl; diff --git a/src/stream_listener_base.hpp b/src/stream_listener_base.hpp index 4925908f..3a5fb9c6 100644 --- a/src/stream_listener_base.hpp +++ b/src/stream_listener_base.hpp @@ -67,7 +67,7 @@ class stream_listener_base_t : public own_t, public io_object_t // Close the listening socket. virtual int close (); - void create_engine (fd_t fd); + virtual void create_engine (fd_t fd); // Underlying socket. fd_t _s; diff --git a/src/ws_connecter.cpp b/src/ws_connecter.cpp new file mode 100644 index 00000000..10091f27 --- /dev/null +++ b/src/ws_connecter.cpp @@ -0,0 +1,322 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include +#include + +#include "macros.hpp" +#include "ws_connecter.hpp" +#include "stream_engine.hpp" +#include "io_thread.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "tcp.hpp" +#include "address.hpp" +#include "tcp_address.hpp" +#include "session_base.hpp" +#include "ws_engine.hpp" + +#if !defined ZMQ_HAVE_WINDOWS +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif +#ifdef ZMQ_HAVE_OPENVMS +#include +#endif +#endif + +#ifdef __APPLE__ +#include +#endif + +zmq::ws_connecter_t::ws_connecter_t (class io_thread_t *io_thread_, + class session_base_t *session_, + const options_t &options_, + address_t *addr_, + bool delayed_start_) : + stream_connecter_base_t ( + io_thread_, session_, options_, addr_, delayed_start_), + _connect_timer_started (false) +{ + zmq_assert (_addr->protocol == protocol_name::ws); +} + +zmq::ws_connecter_t::~ws_connecter_t () +{ + zmq_assert (!_connect_timer_started); +} + +void zmq::ws_connecter_t::process_term (int linger_) +{ + if (_connect_timer_started) { + cancel_timer (connect_timer_id); + _connect_timer_started = false; + } + + stream_connecter_base_t::process_term (linger_); +} + +void zmq::ws_connecter_t::out_event () +{ + if (_connect_timer_started) { + cancel_timer (connect_timer_id); + _connect_timer_started = false; + } + + // TODO this is still very similar to (t)ipc_connecter_t, maybe the + // differences can be factored out + + rm_handle (); + + const fd_t fd = connect (); + + // Handle the error condition by attempt to reconnect. + if (fd == retired_fd || !tune_socket (fd)) { + close (); + add_reconnect_timer (); + return; + } + + create_engine (fd, get_socket_name (fd, socket_end_local)); +} + +void zmq::ws_connecter_t::timer_event (int id_) +{ + if (id_ == connect_timer_id) { + _connect_timer_started = false; + rm_handle (); + close (); + add_reconnect_timer (); + } else + stream_connecter_base_t::timer_event (id_); +} + +void zmq::ws_connecter_t::start_connecting () +{ + // Open the connecting socket. + const int rc = open (); + + // Connect may succeed in synchronous manner. + if (rc == 0) { + _handle = add_fd (_s); + out_event (); + } + + // Connection establishment may be delayed. Poll for its completion. + else if (rc == -1 && errno == EINPROGRESS) { + _handle = add_fd (_s); + set_pollout (_handle); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); + + // add userspace connect timeout + add_connect_timer (); + } + + // Handle any other error condition by eventual reconnect. + else { + if (_s != retired_fd) + close (); + add_reconnect_timer (); + } +} + +void zmq::ws_connecter_t::add_connect_timer () +{ + if (options.connect_timeout > 0) { + add_timer (options.connect_timeout, connect_timer_id); + _connect_timer_started = true; + } +} + +int zmq::ws_connecter_t::open () +{ + zmq_assert (_s == retired_fd); + + // Resolve the address + if (_addr->resolved.tcp_addr != NULL) { + LIBZMQ_DELETE (_addr->resolved.tcp_addr); + } + + _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); + alloc_assert (_addr->resolved.tcp_addr); + _s = tcp_open_socket (_addr->address.c_str (), options, false, true, + _addr->resolved.tcp_addr); + if (_s == retired_fd) { + // TODO we should emit some event in this case! + + LIBZMQ_DELETE (_addr->resolved.tcp_addr); + return -1; + } + zmq_assert (_addr->resolved.tcp_addr != NULL); + + // Set the socket to non-blocking mode so that we get async connect(). + unblock_socket (_s); + + const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr; + + int rc; + + // Set a source address for conversations + if (tcp_addr->has_src_addr ()) { + // Allow reusing of the address, to connect to different servers + // using the same source port on the client. + int flag = 1; +#ifdef ZMQ_HAVE_WINDOWS + rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast (&flag), sizeof (int)); + wsa_assert (rc != SOCKET_ERROR); +#elif defined ZMQ_HAVE_VXWORKS + rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, + sizeof (int)); + errno_assert (rc == 0); +#else + rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + errno_assert (rc == 0); +#endif + +#if defined ZMQ_HAVE_VXWORKS + rc = ::bind (_s, (sockaddr *) tcp_addr->src_addr (), + tcp_addr->src_addrlen ()); +#else + rc = ::bind (_s, tcp_addr->src_addr (), tcp_addr->src_addrlen ()); +#endif + if (rc == -1) + return -1; + } + + // Connect to the remote peer. +#if defined ZMQ_HAVE_VXWORKS + rc = ::connect (_s, (sockaddr *) tcp_addr->addr (), tcp_addr->addrlen ()); +#else + rc = ::connect (_s, tcp_addr->addr (), tcp_addr->addrlen ()); +#endif + // Connect was successful immediately. + if (rc == 0) { + return 0; + } + + // Translate error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. +#ifdef ZMQ_HAVE_WINDOWS + const int last_error = WSAGetLastError (); + if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) + errno = EINPROGRESS; + else + errno = wsa_error_to_errno (last_error); +#else + if (errno == EINTR) + errno = EINPROGRESS; +#endif + return -1; +} + +zmq::fd_t zmq::ws_connecter_t::connect () +{ + // Async connect has finished. Check whether an error occurred + int err = 0; +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS + int len = sizeof err; +#else + socklen_t len = sizeof err; +#endif + + const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, + reinterpret_cast (&err), &len); + + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. +#ifdef ZMQ_HAVE_WINDOWS + zmq_assert (rc == 0); + if (err != 0) { + if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK + || err == WSAENOBUFS) { + wsa_assert_no (err); + } + return retired_fd; + } +#else + // Following code should handle both Berkeley-derived socket + // implementations and Solaris. + if (rc == -1) + err = errno; + if (err != 0) { + errno = err; +#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE + errno_assert (errno != EBADF && errno != ENOPROTOOPT + && errno != ENOTSOCK && errno != ENOBUFS); +#else + errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK + && errno != ENOBUFS); +#endif + return retired_fd; + } +#endif + + // Return the newly connected socket. + const fd_t result = _s; + _s = retired_fd; + return result; +} + +bool zmq::ws_connecter_t::tune_socket (const fd_t fd_) +{ + const int rc = + tune_tcp_socket (fd_) | tune_tcp_maxrt (fd_, options.tcp_maxrt); + return rc == 0; +} + +void zmq::ws_connecter_t::create_engine (fd_t fd, + const std::string &local_address_) +{ + const endpoint_uri_pair_t endpoint_pair (local_address_, _endpoint, + endpoint_type_connect); + + // Create the engine object for this connection. + ws_engine_t *engine = + new (std::nothrow) ws_engine_t (fd, options, endpoint_pair, true); + alloc_assert (engine); + + // Attach the engine to the corresponding session object. + send_attach (_session, engine); + + // Shut the connecter down. + terminate (); + + _socket->event_connected (endpoint_pair, fd); +} diff --git a/src/ws_connecter.hpp b/src/ws_connecter.hpp new file mode 100644 index 00000000..eb67b0de --- /dev/null +++ b/src/ws_connecter.hpp @@ -0,0 +1,94 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __WS_CONNECTER_HPP_INCLUDED__ +#define __WS_CONNECTER_HPP_INCLUDED__ + +#include "fd.hpp" +#include "stdint.hpp" +#include "stream_connecter_base.hpp" + +namespace zmq +{ +class ws_connecter_t : public stream_connecter_base_t +{ + public: + // If 'delayed_start' is true connecter first waits for a while, + // then starts connection process. + ws_connecter_t (zmq::io_thread_t *io_thread_, + zmq::session_base_t *session_, + const options_t &options_, + address_t *addr_, + bool delayed_start_); + ~ws_connecter_t (); + + protected: + void create_engine (fd_t fd, const std::string &local_address_); + + private: + // ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id. + enum + { + connect_timer_id = 2 + }; + + // Handlers for incoming commands. + void process_term (int linger_); + + // Handlers for I/O events. + void out_event (); + void timer_event (int id_); + + // Internal function to start the actual connection establishment. + void start_connecting (); + + // Internal function to add a connect timer + void add_connect_timer (); + + // Open TCP connecting socket. Returns -1 in case of error, + // 0 if connect was successful immediately. Returns -1 with + // EAGAIN errno if async connect was launched. + int open (); + + // Get the file descriptor of newly created connection. Returns + // retired_fd if the connection was unsuccessful. + fd_t connect (); + + // Tunes a connected socket. + bool tune_socket (fd_t fd_); + + // True iff a timer has been started. + bool _connect_timer_started; + + ws_connecter_t (const ws_connecter_t &); + const ws_connecter_t &operator= (const ws_connecter_t &); +}; +} + +#endif diff --git a/src/ws_decoder.cpp b/src/ws_decoder.cpp new file mode 100644 index 00000000..0489af58 --- /dev/null +++ b/src/ws_decoder.cpp @@ -0,0 +1,271 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include +#include +#include + +#include "ws_protocol.hpp" +#include "ws_decoder.hpp" +#include "likely.hpp" +#include "wire.hpp" +#include "err.hpp" + +zmq::ws_decoder_t::ws_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, + bool zero_copy_, + bool must_mask_) : + decoder_base_t (bufsize_), + _msg_flags (0), + _zero_copy (zero_copy_), + _max_msg_size (maxmsgsize_), + _must_mask (must_mask_), + _size (0) +{ + int rc = _in_progress.init (); + errno_assert (rc == 0); + + // At the beginning, read one byte and go to opcode_ready state. + next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready); +} + +zmq::ws_decoder_t::~ws_decoder_t () +{ + int rc = _in_progress.close (); + errno_assert (rc == 0); +} + +int zmq::ws_decoder_t::opcode_ready (unsigned char const *) +{ + bool final = (_tmpbuf[0] & 0x80) != 0; // final bit + if (!final) + return -1; // non final messages are not supported + + _opcode = (zmq::ws_protocol_t::opcode_t) (_tmpbuf[0] & 0xF); + + _msg_flags = 0; + + switch (_opcode) { + case zmq::ws_protocol_t::opcode_binary: + break; + case zmq::ws_protocol_t::opcode_close: + _msg_flags = msg_t::command; // TODO: set the command name to CLOSE + break; + case zmq::ws_protocol_t::opcode_ping: + _msg_flags = msg_t::ping; + break; + case zmq::ws_protocol_t::opcode_pong: + _msg_flags = msg_t::pong; + break; + default: + return -1; + } + + next_step (_tmpbuf, 1, &ws_decoder_t::size_first_byte_ready); + + return 0; +} + +int zmq::ws_decoder_t::size_first_byte_ready (unsigned char const *read_from_) +{ + bool is_masked = (_tmpbuf[0] & 0x80) != 0; + + if (is_masked != _must_mask) // wrong mask value + return -1; + + _size = (uint64_t) (_tmpbuf[0] & 0x7F); + + if (_size < 126) { + if (_must_mask) + next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready); + else if (_opcode == ws_protocol_t::opcode_binary) { + if (_size == 0) + return -1; + next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready); + } else + return size_ready (read_from_); + } else if (_size == 126) + next_step (_tmpbuf, 2, &ws_decoder_t::short_size_ready); + else + next_step (_tmpbuf, 8, &ws_decoder_t::long_size_ready); + + return 0; +} + + +int zmq::ws_decoder_t::short_size_ready (unsigned char const *read_from_) +{ + _size = (_tmpbuf[0] << 8) | _tmpbuf[1]; + + if (_must_mask) + next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready); + else if (_opcode == ws_protocol_t::opcode_binary) { + if (_size == 0) + return -1; + next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready); + } else + return size_ready (read_from_); + + return 0; +} + +int zmq::ws_decoder_t::long_size_ready (unsigned char const *read_from_) +{ + // The payload size is encoded as 64-bit unsigned integer. + // The most significant byte comes first. + _size = get_uint64 (_tmpbuf); + + if (_must_mask) + next_step (_tmpbuf, 4, &ws_decoder_t::mask_ready); + else if (_opcode == ws_protocol_t::opcode_binary) { + if (_size == 0) + return -1; + next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready); + } else + return size_ready (read_from_); + + return 0; +} + +int zmq::ws_decoder_t::mask_ready (unsigned char const *read_from_) +{ + memcpy (_mask, _tmpbuf, 4); + + if (_opcode == ws_protocol_t::opcode_binary) { + if (_size == 0) + return -1; + + next_step (_tmpbuf, 1, &ws_decoder_t::flags_ready); + } else + return size_ready (read_from_); + + return 0; +} + +int zmq::ws_decoder_t::flags_ready (unsigned char const *read_from_) +{ + unsigned char flags; + + if (_must_mask) + flags = _tmpbuf[0] ^ _mask[0]; + else + flags = _tmpbuf[0]; + + if (flags & ws_protocol_t::more_flag) + _msg_flags |= msg_t::more; + if (flags & ws_protocol_t::command_flag) + _msg_flags |= msg_t::command; + + _size--; + + return size_ready (read_from_); +} + + +int zmq::ws_decoder_t::size_ready (unsigned char const *read_pos_) +{ + // Message size must not exceed the maximum allowed size. + if (_max_msg_size >= 0) + if (unlikely (_size > static_cast (_max_msg_size))) { + errno = EMSGSIZE; + return -1; + } + + // Message size must fit into size_t data type. + if (unlikely (_size != static_cast (_size))) { + errno = EMSGSIZE; + return -1; + } + + int rc = _in_progress.close (); + assert (rc == 0); + + // the current message can exceed the current buffer. We have to copy the buffer + // data into a new message and complete it in the next receive. + + shared_message_memory_allocator &allocator = get_allocator (); + if (unlikely (!_zero_copy + || _size > (size_t) (allocator.data () + allocator.size () + - read_pos_))) { + // a new message has started, but the size would exceed the pre-allocated arena + // this happens every time when a message does not fit completely into the buffer + rc = _in_progress.init_size (static_cast (_size)); + } else { + // construct message using n bytes from the buffer as storage + // increase buffer ref count + // if the message will be a large message, pass a valid refcnt memory location as well + rc = _in_progress.init ( + const_cast (read_pos_), static_cast (_size), + shared_message_memory_allocator::call_dec_ref, allocator.buffer (), + allocator.provide_content ()); + + // For small messages, data has been copied and refcount does not have to be increased + if (_in_progress.is_zcmsg ()) { + allocator.advance_content (); + allocator.inc_ref (); + } + } + + if (unlikely (rc)) { + errno_assert (errno == ENOMEM); + rc = _in_progress.init (); + errno_assert (rc == 0); + errno = ENOMEM; + return -1; + } + + _in_progress.set_flags (_msg_flags); + // this sets read_pos to + // the message data address if the data needs to be copied + // for small message / messages exceeding the current buffer + // or + // to the current start address in the buffer because the message + // was constructed to use n bytes from the address passed as argument + next_step (_in_progress.data (), _in_progress.size (), + &ws_decoder_t::message_ready); + + return 0; +} + +int zmq::ws_decoder_t::message_ready (unsigned char const *) +{ + if (_must_mask) { + int mask_index = _opcode == ws_protocol_t::opcode_binary ? 1 : 0; + + unsigned char *data = (unsigned char *) _in_progress.data (); + for (size_t i = 0; i < _size; ++i, mask_index++) + data[i] = data[i] ^ _mask[mask_index % 4]; + } + + // Message is completely read. Signal this to the caller + // and prepare to decode next message. + next_step (_tmpbuf, 1, &ws_decoder_t::opcode_ready); + return 1; +} diff --git a/src/ws_decoder.hpp b/src/ws_decoder.hpp new file mode 100644 index 00000000..6a5f0a6a --- /dev/null +++ b/src/ws_decoder.hpp @@ -0,0 +1,82 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_WS_DECODER_HPP_INCLUDED__ +#define __ZMQ_WS_DECODER_HPP_INCLUDED__ + +#include "decoder.hpp" +#include "decoder_allocators.hpp" +#include "ws_protocol.hpp" + +namespace zmq +{ +// Decoder for Web socket framing protocol. Converts data stream into messages. +// The class has to inherit from shared_message_memory_allocator because +// the base class calls allocate in its constructor. +class ws_decoder_t + : public decoder_base_t +{ + public: + ws_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, + bool zero_copy_, + bool must_mask_); + virtual ~ws_decoder_t (); + + // i_decoder interface. + virtual msg_t *msg () { return &_in_progress; } + + private: + int opcode_ready (unsigned char const *); + int size_first_byte_ready (unsigned char const *); + int short_size_ready (unsigned char const *); + int long_size_ready (unsigned char const *); + int mask_ready (unsigned char const *); + int flags_ready (unsigned char const *); + int message_ready (unsigned char const *); + + int size_ready (unsigned char const *); + + unsigned char _tmpbuf[8]; + unsigned char _msg_flags; + msg_t _in_progress; + + const bool _zero_copy; + const int64_t _max_msg_size; + const bool _must_mask; + uint64_t _size; + zmq::ws_protocol_t::opcode_t _opcode; + unsigned char _mask[4]; + + ws_decoder_t (const ws_decoder_t &); + void operator= (const ws_decoder_t &); +}; +} + +#endif diff --git a/src/ws_encoder.cpp b/src/ws_encoder.cpp new file mode 100644 index 00000000..c1b24cb5 --- /dev/null +++ b/src/ws_encoder.cpp @@ -0,0 +1,120 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "ws_protocol.hpp" +#include "ws_encoder.hpp" +#include "msg.hpp" +#include "likely.hpp" +#include "wire.hpp" +#include "random.hpp" + +#include + +zmq::ws_encoder_t::ws_encoder_t (size_t bufsize_, bool must_mask_) : + encoder_base_t (bufsize_), + _must_mask (must_mask_) +{ + // Write 0 bytes to the batch and go to message_ready state. + next_step (NULL, 0, &ws_encoder_t::message_ready, true); + _masked_msg.init (); +} + +zmq::ws_encoder_t::~ws_encoder_t () +{ + _masked_msg.close (); +} + +void zmq::ws_encoder_t::message_ready () +{ + int offset = 0; + + // TODO: it might be close/ping/pong, which should be different op code + _tmp_buf[offset++] = 0x82; // Final | binary + _tmp_buf[offset] = _must_mask ? 0x80 : 0x00; + + size_t size = in_progress ()->size (); + size++; // TODO: check if binary + + if (size <= 125) + _tmp_buf[offset++] |= (unsigned char) (size & 127); + else if (size <= 0xFFFF) { + _tmp_buf[offset++] |= 126; + _tmp_buf[offset++] = (unsigned char) ((size >> 8) & 0xFF); + _tmp_buf[offset++] = (unsigned char) (size & 0xFF); + } else { + _tmp_buf[offset++] |= 127; + put_uint64 (_tmp_buf + offset, size); + offset += 8; + } + + if (_must_mask) { + uint32_t random = generate_random (); + put_uint32 (_tmp_buf + offset, random); + put_uint32 (_mask, random); + offset += 4; + } + + // TODO: check if binary + + // Encode flags. + unsigned char protocol_flags = 0; + if (in_progress ()->flags () & msg_t::more) + protocol_flags |= ws_protocol_t::more_flag; + if (in_progress ()->flags () & msg_t::command) + protocol_flags |= ws_protocol_t::command_flag; + + _tmp_buf[offset++] = + _must_mask ? protocol_flags ^ _mask[0] : protocol_flags; + + next_step (_tmp_buf, offset, &ws_encoder_t::size_ready, false); +} + +void zmq::ws_encoder_t::size_ready () +{ + if (_must_mask) { + assert (in_progress () != &_masked_msg); + size_t size = in_progress ()->size (); + + _masked_msg.close (); + _masked_msg.init_size (size); + + int mask_index = 1; // TODO: check if binary message + unsigned char *dest = (unsigned char *) _masked_msg.data (); + unsigned char *src = (unsigned char *) in_progress ()->data (); + for (size_t i = 0; i < in_progress ()->size (); ++i, mask_index++) + dest[i] = src[i] ^ _mask[mask_index % 4]; + + next_step (_masked_msg.data (), _masked_msg.size (), + &ws_encoder_t::message_ready, true); + } else { + next_step (in_progress ()->data (), in_progress ()->size (), + &ws_encoder_t::message_ready, true); + } +} diff --git a/src/ws_encoder.hpp b/src/ws_encoder.hpp new file mode 100644 index 00000000..d8dad36c --- /dev/null +++ b/src/ws_encoder.hpp @@ -0,0 +1,59 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_WS_ENCODER_HPP_INCLUDED__ +#define __ZMQ_WS_ENCODER_HPP_INCLUDED__ + +#include "encoder.hpp" + +namespace zmq +{ +// Encoder for web socket framing protocol. Converts messages into data stream. + +class ws_encoder_t : public encoder_base_t +{ + public: + ws_encoder_t (size_t bufsize_, bool must_mask_); + virtual ~ws_encoder_t (); + + private: + void size_ready (); + void message_ready (); + + unsigned char _tmp_buf[16]; + bool _must_mask; + unsigned char _mask[4]; + msg_t _masked_msg; + + ws_encoder_t (const ws_encoder_t &); + const ws_encoder_t &operator= (const ws_encoder_t &); +}; +} + +#endif diff --git a/src/ws_engine.cpp b/src/ws_engine.cpp new file mode 100644 index 00000000..552b3be0 --- /dev/null +++ b/src/ws_engine.cpp @@ -0,0 +1,1081 @@ +/* +Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +#include "precompiled.hpp" + +#if !defined ZMQ_HAVE_WINDOWS +#include +#include +#include +#include +#include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif +#endif + +#include "tcp.hpp" +#include "ws_engine.hpp" +#include "session_base.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "random.hpp" +#include "../external/sha1/sha1.h" +#include "ws_decoder.hpp" +#include "ws_encoder.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#define strcasecmp _stricmp +#endif + +// OSX uses a different name for this socket option +#ifndef IPV6_ADD_MEMBERSHIP +#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP +#endif + +#ifdef __APPLE__ +#include +#endif + +static int +encode_base64 (const unsigned char *in, int in_len, char *out, int out_len); + +zmq::ws_engine_t::ws_engine_t (fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_, + bool client_) : + _client (client_), + _plugged (false), + _socket (NULL), + _fd (fd_), + _session (NULL), + _handle (static_cast (NULL)), + _options (options_), + _endpoint_uri_pair (endpoint_uri_pair_), + _handshaking (true), + _client_handshake_state (client_handshake_initial), + _server_handshake_state (handshake_initial), + _header_name_position (0), + _header_value_position (0), + _header_upgrade_websocket (false), + _header_connection_upgrade (false), + _websocket_protocol (false), + _input_stopped (false), + _decoder (NULL), + _inpos (NULL), + _insize (0), + _output_stopped (false), + _outpos (NULL), + _outsize (0), + _encoder (NULL), + _sent_routing_id (false), + _received_routing_id (false) +{ + // Put the socket into non-blocking mode. + unblock_socket (_fd); + + memset (_websocket_key, 0, MAX_HEADER_VALUE_LENGTH + 1); + memset (_websocket_accept, 0, MAX_HEADER_VALUE_LENGTH + 1); + + int rc = _tx_msg.init (); + errno_assert (rc == 0); +} + +zmq::ws_engine_t::~ws_engine_t () +{ + zmq_assert (!_plugged); + + if (_fd != retired_fd) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (_fd); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (_fd); + errno_assert (rc == 0); +#endif + _fd = retired_fd; + } + + int rc = _tx_msg.close (); + errno_assert (rc == 0); + + LIBZMQ_DELETE (_encoder); + LIBZMQ_DELETE (_decoder); +} + +void zmq::ws_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) +{ + zmq_assert (!_plugged); + _plugged = true; + + zmq_assert (!_session); + zmq_assert (session_); + _session = session_; + _socket = _session->get_socket (); + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); + _handle = add_fd (_fd); + + if (_client) { + unsigned char nonce[16]; + int *p = (int *) nonce; + + // The nonce doesn't have to be secure one, it is just use to avoid proxy cache + *p = zmq::generate_random (); + *(p + 1) = zmq::generate_random (); + *(p + 2) = zmq::generate_random (); + *(p + 3) = zmq::generate_random (); + + int size = + encode_base64 (nonce, 16, _websocket_key, MAX_HEADER_VALUE_LENGTH); + assert (size > 0); + + size = snprintf ( + (char *) _write_buffer, WS_BUFFER_SIZE, + "GET / HTTP/1.1\r\n" + "Host: server.example.com\r\n" // TODO: we need the address here + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Key: %s\r\n" + "Sec-WebSocket-Protocol: ZWS2.0\r\n" + "Sec-WebSocket-Version: 13\r\n\r\n", + _websocket_key); + assert (size > 0 && size < WS_BUFFER_SIZE); + _outpos = _write_buffer; + _outsize = size; + _output_stopped = false; + set_pollout (_handle); + } else + _output_stopped = true; + + _input_stopped = false; + set_pollin (_handle); + in_event (); +} + + +void zmq::ws_engine_t::unplug () +{ + zmq_assert (_plugged); + _plugged = false; + + rm_fd (_handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); +} + + +void zmq::ws_engine_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::ws_engine_t::in_event () +{ + if (_handshaking) { + if (_client) { + if (!client_handshake ()) + return; + } else if (!server_handshake ()) + return; + } + + zmq_assert (_decoder); + + // If there's no data to process in the buffer... + if (_insize == 0) { + // Retrieve the buffer and read as much data as possible. + // Note that buffer can be arbitrarily large. However, we assume + // the underlying TCP layer has fixed buffer size and thus the + // number of bytes read will be always limited. + size_t bufsize = 0; + _decoder->get_buffer (&_inpos, &bufsize); + + const int rc = tcp_read (_fd, _inpos, bufsize); + + if (rc == 0) { + // connection closed by peer + errno = EPIPE; + error (zmq::stream_engine_t::connection_error); + return; + } + if (rc == -1) { + if (errno != EAGAIN) { + error (zmq::stream_engine_t::connection_error); + return; + } + return; + } + + // Adjust input size + _insize = static_cast (rc); + // Adjust buffer size to received bytes + _decoder->resize_buffer (_insize); + } + + int rc = 0; + size_t processed = 0; + + while (_insize > 0) { + rc = _decoder->decode (_inpos, _insize, processed); + zmq_assert (processed <= _insize); + _inpos += processed; + _insize -= processed; + if (rc == 0 || rc == -1) + break; + + if (!_received_routing_id) { + _received_routing_id = true; + if (_options.recv_routing_id) + _decoder->msg ()->set_flags (msg_t::routing_id); + else { + _decoder->msg ()->close (); + _decoder->msg ()->init (); + continue; + } + } + + rc = _session->push_msg (_decoder->msg ()); + if (rc == -1) + break; + } + + // Tear down the connection if we have failed to decode input data + // or the session has rejected the message. + if (rc == -1) { + if (errno != EAGAIN) { + error (zmq::stream_engine_t::protocol_error); + return; + } + _input_stopped = true; + reset_pollin (_handle); + } + + _session->flush (); + return; +} + +void zmq::ws_engine_t::out_event () +{ + // If write buffer is empty, try to read new data from the encoder. + if (!_outsize) { + // Even when we stop polling as soon as there is no + // data to send, the poller may invoke out_event one + // more time due to 'speculative write' optimisation. + if (unlikely (_encoder == NULL)) { + zmq_assert (_handshaking); + return; + } + + _outpos = NULL; + _outsize = _encoder->encode (&_outpos, 0); + + while (_outsize < static_cast (_options.out_batch_size)) { + if (!_sent_routing_id) { + _tx_msg.close (); + int rc = _tx_msg.init_size (_options.routing_id_size); + errno_assert (rc == 0); + if (_options.routing_id_size > 0) + memcpy (_tx_msg.data (), _options.routing_id, + _options.routing_id_size); + _sent_routing_id = true; + } else if (_session->pull_msg (&_tx_msg) == -1) + break; + _encoder->load_msg (&_tx_msg); + unsigned char *bufptr = _outpos + _outsize; + size_t n = + _encoder->encode (&bufptr, _options.out_batch_size - _outsize); + zmq_assert (n > 0); + if (_outpos == NULL) + _outpos = bufptr; + _outsize += n; + } + + // If there is no data to send, stop polling for output. + if (_outsize == 0) { + _output_stopped = true; + reset_pollout (_handle); + return; + } + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. Note that amount of data to write can be + // arbitrarily large. However, we assume that underlying TCP layer has + // limited transmission buffer and thus the actual number of bytes + // written should be reasonably modest. + const int nbytes = tcp_write (_fd, _outpos, _outsize); + + // IO error has occurred. We stop waiting for output events. + // The engine is not terminated until we detect input error; + // this is necessary to prevent losing incoming messages. + if (nbytes == -1) { + _output_stopped = true; + reset_pollout (_handle); + return; + } + + _outpos += nbytes; + _outsize -= nbytes; + + // If we are still handshaking and there are no data + // to send, stop polling for output. + if (unlikely (_handshaking)) + if (_outsize == 0) { + _output_stopped = true; + reset_pollout (_handle); + } +} + +const zmq::endpoint_uri_pair_t &zmq::ws_engine_t::get_endpoint () const +{ + return _endpoint_uri_pair; +} + +void zmq::ws_engine_t::restart_output () +{ + if (likely (_output_stopped)) { + set_pollout (_handle); + _output_stopped = false; + } +} + +bool zmq::ws_engine_t::server_handshake () +{ + int nbytes = tcp_read (_fd, _read_buffer, WS_BUFFER_SIZE); + if (nbytes == 0) { + errno = EPIPE; + error (zmq::stream_engine_t::connection_error); + return false; + } else if (nbytes == -1) { + if (errno != EAGAIN) + error (zmq::stream_engine_t::connection_error); + return false; + } + + _inpos = _read_buffer; + _insize = nbytes; + + while (_insize > 0) { + char c = (char) *_inpos; + + switch (_server_handshake_state) { + case handshake_initial: + if (c == 'G') + _server_handshake_state = request_line_G; + else + _server_handshake_state = handshake_error; + break; + case request_line_G: + if (c == 'E') + _server_handshake_state = request_line_GE; + else + _server_handshake_state = handshake_error; + break; + case request_line_GE: + if (c == 'T') + _server_handshake_state = request_line_GET; + else + _server_handshake_state = handshake_error; + break; + case request_line_GET: + if (c == ' ') + _server_handshake_state = request_line_GET_space; + else + _server_handshake_state = handshake_error; + break; + case request_line_GET_space: + if (c == '\r' || c == '\n') + _server_handshake_state = handshake_error; + // TODO: instead of check what is not allowed check what is allowed + if (c != ' ') + _server_handshake_state = request_line_resource; + else + _server_handshake_state = request_line_GET_space; + break; + case request_line_resource: + if (c == '\r' || c == '\n') + _server_handshake_state = handshake_error; + else if (c == ' ') + _server_handshake_state = request_line_resource_space; + else + _server_handshake_state = request_line_resource; + break; + case request_line_resource_space: + if (c == 'H') + _server_handshake_state = request_line_H; + else + _server_handshake_state = handshake_error; + break; + case request_line_H: + if (c == 'T') + _server_handshake_state = request_line_HT; + else + _server_handshake_state = handshake_error; + break; + case request_line_HT: + if (c == 'T') + _server_handshake_state = request_line_HTT; + else + _server_handshake_state = handshake_error; + break; + case request_line_HTT: + if (c == 'P') + _server_handshake_state = request_line_HTTP; + else + _server_handshake_state = handshake_error; + break; + case request_line_HTTP: + if (c == '/') + _server_handshake_state = request_line_HTTP_slash; + else + _server_handshake_state = handshake_error; + break; + case request_line_HTTP_slash: + if (c == '1') + _server_handshake_state = request_line_HTTP_slash_1; + else + _server_handshake_state = handshake_error; + break; + case request_line_HTTP_slash_1: + if (c == '.') + _server_handshake_state = request_line_HTTP_slash_1_dot; + else + _server_handshake_state = handshake_error; + break; + case request_line_HTTP_slash_1_dot: + if (c == '1') + _server_handshake_state = request_line_HTTP_slash_1_dot_1; + else + _server_handshake_state = handshake_error; + break; + case request_line_HTTP_slash_1_dot_1: + if (c == '\r') + _server_handshake_state = request_line_cr; + else + _server_handshake_state = handshake_error; + break; + case request_line_cr: + if (c == '\n') + _server_handshake_state = header_field_begin_name; + else + _server_handshake_state = handshake_error; + break; + case header_field_begin_name: + switch (c) { + case '\r': + _server_handshake_state = handshake_end_line_cr; + break; + case '\n': + _server_handshake_state = handshake_error; + break; + default: + _header_name[0] = (char) c; + _header_name_position = 1; + _server_handshake_state = header_field_name; + break; + } + break; + case header_field_name: + if (c == '\r' || c == '\n') + _server_handshake_state = handshake_error; + else if (c == ':') { + _header_name[_header_name_position] = '\0'; + _server_handshake_state = header_field_colon; + } else if (_header_name_position + 1 > MAX_HEADER_NAME_LENGTH) + _server_handshake_state = handshake_error; + else { + _header_name[_header_name_position] = c; + _header_name_position++; + _server_handshake_state = header_field_name; + } + break; + case header_field_colon: + case header_field_value_trailing_space: + if (c == '\n') + _server_handshake_state = handshake_error; + else if (c == '\r') + _server_handshake_state = header_field_cr; + else if (c == ' ') + _server_handshake_state = header_field_value_trailing_space; + else { + _header_value[0] = c; + _header_value_position = 1; + _server_handshake_state = header_field_value; + } + break; + case header_field_value: + if (c == '\n') + _server_handshake_state = handshake_error; + else if (c == '\r') { + _header_value[_header_value_position] = '\0'; + + if (strcasecmp ("upgrade", _header_name) == 0) + _header_upgrade_websocket = + strcasecmp ("websocket", _header_value) == 0; + else if (strcasecmp ("connection", _header_name) == 0) + _header_connection_upgrade = + strcasecmp ("upgrade", _header_value) == 0; + else if (strcasecmp ("Sec-WebSocket-Key", _header_name) + == 0) + strcpy (_websocket_key, _header_value); + else if (strcasecmp ("Sec-WebSocket-Protocol", _header_name) + == 0) + _websocket_protocol = + true; // TODO: check if the value is ZWS2.0 + + _server_handshake_state = header_field_cr; + } else if (_header_value_position + 1 > MAX_HEADER_VALUE_LENGTH) + _server_handshake_state = handshake_error; + else { + _header_value[_header_value_position] = c; + _header_value_position++; + _server_handshake_state = header_field_value; + } + break; + case header_field_cr: + if (c == '\n') + _server_handshake_state = header_field_begin_name; + else + _server_handshake_state = handshake_error; + break; + case handshake_end_line_cr: + if (c == '\n') { + if (_header_connection_upgrade && _header_upgrade_websocket + && _websocket_protocol && _websocket_key[0] != '\0') { + _server_handshake_state = handshake_complete; + _handshaking = false; + + // TODO: check which decoder/encoder to use according to selected protocol + _encoder = new (std::nothrow) + ws_encoder_t (_options.out_batch_size, false); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) ws_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, + _options.zero_copy, true); + alloc_assert (_decoder); + + _socket->event_handshake_succeeded (_endpoint_uri_pair, + 0); + + const char *magic_string = + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + char plain[MAX_HEADER_VALUE_LENGTH + 36 + 1]; + strcpy (plain, _websocket_key); + strcat (plain, magic_string); + + sha1_ctxt ctx; + SHA1_Init (&ctx); + SHA1_Update (&ctx, (unsigned char *) _websocket_key, + strlen (_websocket_key)); + SHA1_Update (&ctx, (unsigned char *) magic_string, + strlen (magic_string)); + + unsigned char hash[SHA_DIGEST_LENGTH]; + SHA1_Final (hash, &ctx); + + int accept_key_len = encode_base64 ( + hash, SHA_DIGEST_LENGTH, _websocket_accept, + MAX_HEADER_VALUE_LENGTH); + assert (accept_key_len > 0); + _websocket_accept[accept_key_len] = '\0'; + + int written = + snprintf ((char *) _write_buffer, WS_BUFFER_SIZE, + "HTTP/1.1 101 Switching Protocols\r\n" + "Upgrade: websocket\r\n" + "Connection: Upgrade\r\n" + "Sec-WebSocket-Accept: %s\r\n" + "Sec-WebSocket-Protocol: ZWS2.0\r\n" + "\r\n", + _websocket_accept); + assert (written >= 0 && written < WS_BUFFER_SIZE); + _outpos = _write_buffer; + _outsize = written; + + if (_output_stopped) + restart_output (); + } else + _server_handshake_state = handshake_error; + } else + _server_handshake_state = handshake_error; + break; + case handshake_complete: + // no more bytes are allowed after complete + _server_handshake_state = handshake_error; + default: + assert (false); + } + + _inpos++; + _insize--; + + if (_server_handshake_state == handshake_error) { + // TODO: send bad request + + _socket->event_handshake_failed_protocol ( + _endpoint_uri_pair, ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED); + + error (zmq::stream_engine_t::protocol_error); + return false; + } + } + return _server_handshake_state == handshake_complete; +} + +bool zmq::ws_engine_t::client_handshake () +{ + int nbytes = tcp_read (_fd, _read_buffer, WS_BUFFER_SIZE); + if (nbytes == 0) { + errno = EPIPE; + error (zmq::stream_engine_t::connection_error); + return false; + } else if (nbytes == -1) { + if (errno != EAGAIN) + error (zmq::stream_engine_t::connection_error); + return false; + } + + _inpos = _read_buffer; + _insize = nbytes; + + while (_insize > 0) { + char c = (char) *_inpos; + + switch (_client_handshake_state) { + case client_handshake_initial: + if (c == 'H') + _client_handshake_state = response_line_H; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_H: + if (c == 'T') + _client_handshake_state = response_line_HT; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HT: + if (c == 'T') + _client_handshake_state = response_line_HTT; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTT: + if (c == 'P') + _client_handshake_state = response_line_HTTP; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTTP: + if (c == '/') + _client_handshake_state = response_line_HTTP_slash; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTTP_slash: + if (c == '1') + _client_handshake_state = response_line_HTTP_slash_1; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTTP_slash_1: + if (c == '.') + _client_handshake_state = response_line_HTTP_slash_1_dot; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTTP_slash_1_dot: + if (c == '1') + _client_handshake_state = response_line_HTTP_slash_1_dot_1; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTTP_slash_1_dot_1: + if (c == ' ') + _client_handshake_state = + response_line_HTTP_slash_1_dot_1_space; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_HTTP_slash_1_dot_1_space: + if (c == ' ') + _client_handshake_state = + response_line_HTTP_slash_1_dot_1_space; + else if (c == '1') + _client_handshake_state = response_line_status_1; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_status_1: + if (c == '0') + _client_handshake_state = response_line_status_10; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_status_10: + if (c == '1') + _client_handshake_state = response_line_status_101; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_status_101: + if (c == ' ') + _client_handshake_state = response_line_status_101_space; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_status_101_space: + if (c == ' ') + _client_handshake_state = response_line_status_101_space; + else if (c == 'S') + _client_handshake_state = response_line_s; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_s: + if (c == 'w') + _client_handshake_state = response_line_sw; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_sw: + if (c == 'i') + _client_handshake_state = response_line_swi; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_swi: + if (c == 't') + _client_handshake_state = response_line_swit; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_swit: + if (c == 'c') + _client_handshake_state = response_line_switc; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_switc: + if (c == 'h') + _client_handshake_state = response_line_switch; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_switch: + if (c == 'i') + _client_handshake_state = response_line_switchi; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_switchi: + if (c == 'n') + _client_handshake_state = response_line_switchin; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_switchin: + if (c == 'g') + _client_handshake_state = response_line_switching; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_switching: + if (c == ' ') + _client_handshake_state = response_line_switching_space; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_switching_space: + if (c == 'P') + _client_handshake_state = response_line_p; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_p: + if (c == 'r') + _client_handshake_state = response_line_pr; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_pr: + if (c == 'o') + _client_handshake_state = response_line_pro; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_pro: + if (c == 't') + _client_handshake_state = response_line_prot; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_prot: + if (c == 'o') + _client_handshake_state = response_line_proto; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_proto: + if (c == 'c') + _client_handshake_state = response_line_protoc; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_protoc: + if (c == 'o') + _client_handshake_state = response_line_protoco; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_protoco: + if (c == 'l') + _client_handshake_state = response_line_protocol; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_protocol: + if (c == 's') + _client_handshake_state = response_line_protocols; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_protocols: + if (c == '\r') + _client_handshake_state = response_line_cr; + else + _client_handshake_state = client_handshake_error; + break; + case response_line_cr: + if (c == '\n') + _client_handshake_state = client_header_field_begin_name; + else + _client_handshake_state = client_handshake_error; + break; + case client_header_field_begin_name: + switch (c) { + case '\r': + _client_handshake_state = client_handshake_end_line_cr; + break; + case '\n': + _client_handshake_state = client_handshake_error; + break; + default: + _header_name[0] = (char) c; + _header_name_position = 1; + _client_handshake_state = client_header_field_name; + break; + } + break; + case client_header_field_name: + if (c == '\r' || c == '\n') + _client_handshake_state = client_handshake_error; + else if (c == ':') { + _header_name[_header_name_position] = '\0'; + _client_handshake_state = client_header_field_colon; + } else if (_header_name_position + 1 > MAX_HEADER_NAME_LENGTH) + _client_handshake_state = client_handshake_error; + else { + _header_name[_header_name_position] = c; + _header_name_position++; + _client_handshake_state = client_header_field_name; + } + break; + case client_header_field_colon: + case client_header_field_value_trailing_space: + if (c == '\n') + _client_handshake_state = client_handshake_error; + else if (c == '\r') + _client_handshake_state = client_header_field_cr; + else if (c == ' ') + _client_handshake_state = + client_header_field_value_trailing_space; + else { + _header_value[0] = c; + _header_value_position = 1; + _client_handshake_state = client_header_field_value; + } + break; + case client_header_field_value: + if (c == '\n') + _client_handshake_state = client_handshake_error; + else if (c == '\r') { + _header_value[_header_value_position] = '\0'; + + if (strcasecmp ("upgrade", _header_name) == 0) + _header_upgrade_websocket = + strcasecmp ("websocket", _header_value) == 0; + else if (strcasecmp ("connection", _header_name) == 0) + _header_connection_upgrade = + strcasecmp ("upgrade", _header_value) == 0; + else if (strcasecmp ("Sec-WebSocket-Accept", _header_name) + == 0) + strcpy (_websocket_accept, _header_value); + else if (strcasecmp ("Sec-WebSocket-Protocol", _header_name) + == 0) + _websocket_protocol = true; // TODO: check if ZWS2.0 + + _client_handshake_state = client_header_field_cr; + } else if (_header_value_position + 1 > MAX_HEADER_VALUE_LENGTH) + _client_handshake_state = client_handshake_error; + else { + _header_value[_header_value_position] = c; + _header_value_position++; + _client_handshake_state = client_header_field_value; + } + break; + case client_header_field_cr: + if (c == '\n') + _client_handshake_state = client_header_field_begin_name; + else + _client_handshake_state = client_handshake_error; + break; + case client_handshake_end_line_cr: + if (c == '\n') { + if (_header_connection_upgrade && _header_upgrade_websocket + && _websocket_protocol + && _websocket_accept[0] != '\0') { + _client_handshake_state = client_handshake_complete; + _handshaking = false; + + _encoder = new (std::nothrow) + ws_encoder_t (_options.out_batch_size, true); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) ws_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, + _options.zero_copy, false); + alloc_assert (_decoder); + + _socket->event_handshake_succeeded (_endpoint_uri_pair, + 0); + + // TODO: validate accept key + + if (_output_stopped) + restart_output (); + + _inpos++; + _insize--; + + return true; + } else + _client_handshake_state = client_handshake_error; + } else + _client_handshake_state = client_handshake_error; + break; + default: + assert (false); + } + + _inpos++; + _insize--; + + if (_client_handshake_state == client_handshake_error) { + _socket->event_handshake_failed_protocol ( + _endpoint_uri_pair, ZMQ_PROTOCOL_ERROR_WS_UNSPECIFIED); + + error (zmq::stream_engine_t::protocol_error); + return false; + } + } + + return false; +} + +void zmq::ws_engine_t::error (zmq::stream_engine_t::error_reason_t reason_) +{ + zmq_assert (_session); + + if (reason_ != zmq::stream_engine_t::protocol_error && _handshaking) { + int err = errno; + _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); + } + + _socket->event_disconnected (_endpoint_uri_pair, _fd); + _session->flush (); + _session->engine_error (reason_); + unplug (); + delete this; +} + +bool zmq::ws_engine_t::restart_input () +{ + zmq_assert (_input_stopped); + + _input_stopped = false; + set_pollin (_handle); + in_event (); + + return true; +} + +static int +encode_base64 (const unsigned char *in, int in_len, char *out, int out_len) +{ + static const unsigned char base64enc_tab[65] = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + int ii, io; + uint32_t v; + int rem; + + for (io = 0, ii = 0, v = 0, rem = 0; ii < in_len; ii++) { + unsigned char ch; + ch = in[ii]; + v = (v << 8) | ch; + rem += 8; + while (rem >= 6) { + rem -= 6; + if (io >= out_len) + return -1; /* truncation is failure */ + out[io++] = base64enc_tab[(v >> rem) & 63]; + } + } + if (rem) { + v <<= (6 - rem); + if (io >= out_len) + return -1; /* truncation is failure */ + out[io++] = base64enc_tab[v & 63]; + } + while (io & 3) { + if (io >= out_len) + return -1; /* truncation is failure */ + out[io++] = '='; + } + if (io >= out_len) + return -1; /* no room for null terminator */ + out[io] = 0; + return io; +} diff --git a/src/ws_engine.hpp b/src/ws_engine.hpp new file mode 100644 index 00000000..d3527843 --- /dev/null +++ b/src/ws_engine.hpp @@ -0,0 +1,184 @@ + +#ifndef __ZMQ_WS_ENGINE_HPP_INCLUDED__ +#define __ZMQ_WS_ENGINE_HPP_INCLUDED__ + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "address.hpp" +#include "msg.hpp" +#include "stream_engine.hpp" + +#define WS_BUFFER_SIZE 8192 +#define MAX_HEADER_NAME_LENGTH 1024 +#define MAX_HEADER_VALUE_LENGTH 2048 + +namespace zmq +{ +class io_thread_t; +class session_base_t; + +typedef enum +{ + handshake_initial = 0, + request_line_G, + request_line_GE, + request_line_GET, + request_line_GET_space, + request_line_resource, + request_line_resource_space, + request_line_H, + request_line_HT, + request_line_HTT, + request_line_HTTP, + request_line_HTTP_slash, + request_line_HTTP_slash_1, + request_line_HTTP_slash_1_dot, + request_line_HTTP_slash_1_dot_1, + request_line_cr, + header_field_begin_name, + header_field_name, + header_field_colon, + header_field_value_trailing_space, + header_field_value, + header_field_cr, + handshake_end_line_cr, + handshake_complete, + + handshake_error = -1 +} ws_server_handshake_state_t; + + +typedef enum +{ + client_handshake_initial = 0, + response_line_H, + response_line_HT, + response_line_HTT, + response_line_HTTP, + response_line_HTTP_slash, + response_line_HTTP_slash_1, + response_line_HTTP_slash_1_dot, + response_line_HTTP_slash_1_dot_1, + response_line_HTTP_slash_1_dot_1_space, + response_line_status_1, + response_line_status_10, + response_line_status_101, + response_line_status_101_space, + response_line_s, + response_line_sw, + response_line_swi, + response_line_swit, + response_line_switc, + response_line_switch, + response_line_switchi, + response_line_switchin, + response_line_switching, + response_line_switching_space, + response_line_p, + response_line_pr, + response_line_pro, + response_line_prot, + response_line_proto, + response_line_protoc, + response_line_protoco, + response_line_protocol, + response_line_protocols, + response_line_cr, + client_header_field_begin_name, + client_header_field_name, + client_header_field_colon, + client_header_field_value_trailing_space, + client_header_field_value, + client_header_field_cr, + client_handshake_end_line_cr, + client_handshake_complete, + + client_handshake_error = -1 +} ws_client_handshake_state_t; + +class ws_engine_t : public io_object_t, public i_engine +{ + public: + ws_engine_t (fd_t fd_, + const options_t &options_, + const endpoint_uri_pair_t &endpoint_uri_pair_, + bool client_); + ~ws_engine_t (); + + // i_engine interface implementation. + // Plug the engine to the session. + void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_); + + // Terminate and deallocate the engine. Note that 'detached' + // events are not fired on termination. + void terminate (); + + // This method is called by the session to signalise that more + // messages can be written to the pipe. + bool restart_input (); + + // This method is called by the session to signalise that there + // are messages to send available. + void restart_output (); + + void zap_msg_available (){}; + + void in_event (); + void out_event (); + + const endpoint_uri_pair_t &get_endpoint () const; + + private: + bool client_handshake (); + bool server_handshake (); + void error (zmq::stream_engine_t::error_reason_t reason_); + void unplug (); + + bool _client; + bool _plugged; + + socket_base_t *_socket; + fd_t _fd; + session_base_t *_session; + handle_t _handle; + + options_t _options; + + // Representation of the connected endpoints. + const endpoint_uri_pair_t _endpoint_uri_pair; + + bool _handshaking; + ws_client_handshake_state_t _client_handshake_state; + ws_server_handshake_state_t _server_handshake_state; + + unsigned char _read_buffer[WS_BUFFER_SIZE]; + unsigned char _write_buffer[WS_BUFFER_SIZE]; + char _header_name[MAX_HEADER_NAME_LENGTH + 1]; + int _header_name_position; + char _header_value[MAX_HEADER_VALUE_LENGTH + 1]; + int _header_value_position; + + bool _header_upgrade_websocket; + bool _header_connection_upgrade; + bool _websocket_protocol; + char _websocket_key[MAX_HEADER_VALUE_LENGTH + 1]; + char _websocket_accept[MAX_HEADER_VALUE_LENGTH + 1]; + + bool _input_stopped; + i_decoder *_decoder; + unsigned char *_inpos; + size_t _insize; + + bool _output_stopped; + unsigned char *_outpos; + size_t _outsize; + i_encoder *_encoder; + + bool _sent_routing_id; + bool _received_routing_id; + + msg_t _tx_msg; +}; +} + +#endif diff --git a/src/ws_listener.cpp b/src/ws_listener.cpp new file mode 100644 index 00000000..6d3b49f4 --- /dev/null +++ b/src/ws_listener.cpp @@ -0,0 +1,272 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include + +#include +#include + +#include "ws_listener.hpp" +#include "io_thread.hpp" +#include "config.hpp" +#include "err.hpp" +#include "ip.hpp" +#include "tcp.hpp" +#include "socket_base.hpp" +#include "address.hpp" +#include "ws_engine.hpp" +#include "session_base.hpp" + +#ifndef ZMQ_HAVE_WINDOWS +#include +#include +#include +#include +#include +#include +#include +#ifdef ZMQ_HAVE_VXWORKS +#include +#endif +#endif + +#ifdef ZMQ_HAVE_OPENVMS +#include +#endif + +zmq::ws_listener_t::ws_listener_t (io_thread_t *io_thread_, + socket_base_t *socket_, + const options_t &options_) : + stream_listener_base_t (io_thread_, socket_, options_) +{ +} + +void zmq::ws_listener_t::in_event () +{ + fd_t fd = accept (); + + // If connection was reset by the peer in the meantime, just ignore it. + // TODO: Handle specific errors like ENFILE/EMFILE etc. + if (fd == retired_fd) { + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); + return; + } + + int rc = tune_tcp_socket (fd); + rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt); + if (rc != 0) { + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); + return; + } + + // Create the engine object for this connection. + create_engine (fd); +} + +std::string zmq::ws_listener_t::get_socket_name (zmq::fd_t fd_, + socket_end_t socket_end_) const +{ + return zmq::get_socket_name (fd_, socket_end_); +} + +int zmq::ws_listener_t::create_socket (const char *addr_) +{ + _s = tcp_open_socket (addr_, options, true, true, &_address); + if (_s == retired_fd) { + return -1; + } + + // TODO why is this only done for the listener? + make_socket_noninheritable (_s); + + // Allow reusing of the address. + int flag = 1; + int rc; +#ifdef ZMQ_HAVE_WINDOWS + // TODO this was changed for Windows from SO_REUSEADDRE to + // SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c, + // so the comment above is no longer correct; also, now the settings are + // different between listener and connecter with a src address. + // is this intentional? + rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, + reinterpret_cast (&flag), sizeof (int)); + wsa_assert (rc != SOCKET_ERROR); +#elif defined ZMQ_HAVE_VXWORKS + rc = + setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, (char *) &flag, sizeof (int)); + errno_assert (rc == 0); +#else + rc = setsockopt (_s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + errno_assert (rc == 0); +#endif + + // Bind the socket to the network interface and port. +#if defined ZMQ_HAVE_VXWORKS + rc = bind (_s, (sockaddr *) _address.addr (), _address.addrlen ()); +#else + rc = bind (_s, _address.addr (), _address.addrlen ()); +#endif +#ifdef ZMQ_HAVE_WINDOWS + if (rc == SOCKET_ERROR) { + errno = wsa_error_to_errno (WSAGetLastError ()); + goto error; + } +#else + if (rc != 0) + goto error; +#endif + + // Listen for incoming connections. + rc = listen (_s, options.backlog); +#ifdef ZMQ_HAVE_WINDOWS + if (rc == SOCKET_ERROR) { + errno = wsa_error_to_errno (WSAGetLastError ()); + goto error; + } +#else + if (rc != 0) + goto error; +#endif + + return 0; + +error: + int err = errno; + close (); + errno = err; + return -1; +} + +int zmq::ws_listener_t::set_local_address (const char *addr_) +{ + if (options.use_fd != -1) { + // in this case, the addr_ passed is not used and ignored, since the + // socket was already created by the application + _s = options.use_fd; + } else { + if (create_socket (addr_) == -1) + return -1; + } + + _endpoint = get_socket_name (_s, socket_end_local); + + _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint), + _s); + return 0; +} + +zmq::fd_t zmq::ws_listener_t::accept () +{ + // The situation where connection cannot be accepted due to insufficient + // resources is considered valid and treated by ignoring the connection. + // Accept one connection and deal with different failure modes. + zmq_assert (_s != retired_fd); + + struct sockaddr_storage ss; + memset (&ss, 0, sizeof (ss)); +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS + int ss_len = sizeof (ss); +#else + socklen_t ss_len = sizeof (ss); +#endif +#if defined ZMQ_HAVE_SOCK_CLOEXEC && defined HAVE_ACCEPT4 + fd_t sock = ::accept4 (_s, reinterpret_cast (&ss), + &ss_len, SOCK_CLOEXEC); +#else + fd_t sock = + ::accept (_s, reinterpret_cast (&ss), &ss_len); +#endif + + if (sock == retired_fd) { +#if defined ZMQ_HAVE_WINDOWS + const int last_error = WSAGetLastError (); + wsa_assert (last_error == WSAEWOULDBLOCK || last_error == WSAECONNRESET + || last_error == WSAEMFILE || last_error == WSAENOBUFS); +#elif defined ZMQ_HAVE_ANDROID + errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR + || errno == ECONNABORTED || errno == EPROTO + || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE + || errno == ENFILE || errno == EINVAL); +#else + errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR + || errno == ECONNABORTED || errno == EPROTO + || errno == ENOBUFS || errno == ENOMEM || errno == EMFILE + || errno == ENFILE); +#endif + return retired_fd; + } + + make_socket_noninheritable (sock); + + if (zmq::set_nosigpipe (sock)) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (sock); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = ::close (sock); + errno_assert (rc == 0); +#endif + return retired_fd; + } + + // Set the IP Type-Of-Service priority for this client socket + if (options.tos != 0) + set_ip_type_of_service (sock, options.tos); + + return sock; +} + +void zmq::ws_listener_t::create_engine (fd_t fd) +{ + const endpoint_uri_pair_t endpoint_pair ( + get_socket_name (fd, socket_end_local), + get_socket_name (fd, socket_end_remote), endpoint_type_bind); + + ws_engine_t *engine = + new (std::nothrow) ws_engine_t (fd, options, endpoint_pair, false); + alloc_assert (engine); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create and launch a session object. + session_base_t *session = + session_base_t::create (io_thread, false, _socket, options, NULL); + errno_assert (session); + session->inc_seqnum (); + launch_child (session); + send_attach (session, engine, false); + + _socket->event_accepted (endpoint_pair, fd); +} diff --git a/src/ws_listener.hpp b/src/ws_listener.hpp new file mode 100644 index 00000000..47a3f744 --- /dev/null +++ b/src/ws_listener.hpp @@ -0,0 +1,73 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_WS_LISTENER_HPP_INCLUDED__ +#define __ZMQ_WS_LISTENER_HPP_INCLUDED__ + +#include "fd.hpp" +#include "tcp_address.hpp" +#include "stream_listener_base.hpp" + +namespace zmq +{ +class ws_listener_t : public stream_listener_base_t +{ + public: + ws_listener_t (zmq::io_thread_t *io_thread_, + zmq::socket_base_t *socket_, + const options_t &options_); + + // Set address to listen on. + int set_local_address (const char *addr_); + + protected: + std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const; + void create_engine (fd_t fd); + + private: + // Handlers for I/O events. + void in_event (); + + // Accept the new connection. Returns the file descriptor of the + // newly created connection. The function may return retired_fd + // if the connection was dropped while waiting in the listen backlog + // or was denied because of accept filters. + fd_t accept (); + + int create_socket (const char *addr_); + + // Address to listen on. + tcp_address_t _address; + + ws_listener_t (const ws_listener_t &); + const ws_listener_t &operator= (const ws_listener_t &); +}; +} + +#endif diff --git a/src/ws_protocol.hpp b/src/ws_protocol.hpp new file mode 100644 index 00000000..5642aa07 --- /dev/null +++ b/src/ws_protocol.hpp @@ -0,0 +1,58 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_WS_PROTOCOL_HPP_INCLUDED__ +#define __ZMQ_WS_PROTOCOL_HPP_INCLUDED__ + +namespace zmq +{ +// Definition of constants for WS transport protocol. +class ws_protocol_t +{ + public: + // Message flags. + enum opcode_t + { + opcode_continuation = 0, + opcode_text = 0x01, + opcode_binary = 0x02, + opcode_close = 0x08, + opcode_ping = 0x09, + opcode_pong = 0xA + }; + + enum + { + more_flag = 1, + command_flag = 2 + }; +}; +} + +#endif diff --git a/tests/test_ws_transport.cpp b/tests/test_ws_transport.cpp new file mode 100644 index 00000000..b1f61aae --- /dev/null +++ b/tests/test_ws_transport.cpp @@ -0,0 +1,117 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +void test_roundtrip () +{ + void *sb = test_context_socket (ZMQ_REP); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:5556")); + + void *sc = test_context_socket (ZMQ_REQ); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "ws://127.0.0.1:5556")); + + bounce (sb, sc); + + test_context_socket_close (sc); + test_context_socket_close (sb); +} + +void test_short_message () +{ + void *sb = test_context_socket (ZMQ_REP); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:5557")); + + void *sc = test_context_socket (ZMQ_REQ); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "ws://127.0.0.1:5557")); + + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 255)); + + for (unsigned char i = 0; i < 255; ++i) + ((unsigned char *) zmq_msg_data (&msg))[i] = i; + + int rc = zmq_msg_send (&msg, sc, 0); + TEST_ASSERT_EQUAL_INT (255, rc); + + rc = zmq_msg_recv (&msg, sb, 0); + TEST_ASSERT_EQUAL_INT (255, rc); + + for (unsigned char i = 0; i < 255; ++i) + TEST_ASSERT_EQUAL_INT (i, ((unsigned char *) zmq_msg_data (&msg))[i]); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + test_context_socket_close (sc); + test_context_socket_close (sb); +} + +void test_large_message () +{ + void *sb = test_context_socket (ZMQ_REP); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "ws://*:5557")); + + void *sc = test_context_socket (ZMQ_REQ); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "ws://127.0.0.1:5557")); + + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 65536)); + + for (int i = 0; i < 65536; ++i) + ((unsigned char *) zmq_msg_data (&msg))[i] = i % 255; + + int rc = zmq_msg_send (&msg, sc, 0); + TEST_ASSERT_EQUAL_INT (65536, rc); + + rc = zmq_msg_recv (&msg, sb, 0); + TEST_ASSERT_EQUAL_INT (65536, rc); + + for (int i = 0; i < 65536; ++i) + TEST_ASSERT_EQUAL_INT (i % 255, + ((unsigned char *) zmq_msg_data (&msg))[i]); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + + test_context_socket_close (sc); + test_context_socket_close (sb); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_roundtrip); + RUN_TEST (test_short_message); + RUN_TEST (test_large_message); + return UNITY_END (); +}