From 7cfd03ba72dd91d2e8f1b246bf68dbf43ec135f4 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 15:15:09 +0200 Subject: [PATCH 1/7] Remove unplug from i_engine interface --- src/i_engine.hpp | 3 --- src/pgm_receiver.hpp | 4 +++- src/pgm_sender.hpp | 4 +++- src/stream_engine.hpp | 4 +++- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 93cd8b10..8ae351a4 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -37,9 +37,6 @@ namespace zmq virtual void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_) = 0; - // Unplug the engine from the session. - virtual void unplug () = 0; - // Terminate and deallocate the engine. Note that 'detached' // events are not fired on termination. virtual void terminate () = 0; diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 7c97d5a0..eb50128b 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -59,7 +59,6 @@ namespace zmq // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); - void unplug (); void terminate (); void activate_in (); void activate_out (); @@ -70,6 +69,9 @@ namespace zmq private: + // Unplug the engine from the session. + void unplug (); + // PGM is not able to move subscriptions upstream. Thus, drop all // the pending subscriptions. void drop_subscriptions (); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 99405f07..afe030d4 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -57,7 +57,6 @@ namespace zmq // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); - void unplug (); void terminate (); void activate_in (); void activate_out (); @@ -69,6 +68,9 @@ namespace zmq private: + // Unplug the engine from the session. + void unplug (); + // TX and RX timeout timer ID's. enum {tx_timer_id = 0xa0, rx_timer_id = 0xa1}; diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 939c2ed5..02d00d24 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -51,7 +51,6 @@ namespace zmq // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); - void unplug (); void terminate (); void activate_in (); void activate_out (); @@ -62,6 +61,9 @@ namespace zmq private: + // Unplug the engine from the session. + void unplug (); + // Function to handle network disconnections. void error (); From 52ed4cdccf16567674e9c949da74b484b596f8a8 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 14:51:12 +0200 Subject: [PATCH 2/7] Remove dead code --- src/session_base.cpp | 8 +------- src/stream_engine.cpp | 28 ++++------------------------ 2 files changed, 5 insertions(+), 31 deletions(-) diff --git a/src/session_base.cpp b/src/session_base.cpp index 681b1455..c13308aa 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -280,13 +280,7 @@ void zmq::session_base_t::process_plug () void zmq::session_base_t::process_attach (i_engine *engine_) { - // If some other object (e.g. init) notifies us that the connection failed - // without creating an engine we need to start the reconnection process. - if (!engine_) { - zmq_assert (!engine); - detached (); - return; - } + zmq_assert (engine_ != NULL); // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 1161c292..8142ce5d 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -52,7 +52,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : outsize (0), encoder (out_batch_size), session (NULL), - leftover_session (NULL), options (options_), plugged (false) { @@ -109,7 +108,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, { zmq_assert (!plugged); plugged = true; - leftover_session = NULL; // Connect to session object. zmq_assert (!session); @@ -144,7 +142,6 @@ void zmq::stream_engine_t::unplug () // Disconnect from session object. encoder.set_session (NULL); decoder.set_session (NULL); - leftover_session = session; session = NULL; endpoint.clear(); } @@ -185,12 +182,8 @@ void zmq::stream_engine_t::in_event () else { // Stop polling for input if we got stuck. - if (processed < insize) { - - // This may happen if queue limits are in effect. - if (plugged) - reset_pollin (handle); - } + if (processed < insize) + reset_pollin (handle); // Adjust the buffer. inpos += processed; @@ -198,20 +191,14 @@ void zmq::stream_engine_t::in_event () } // Flush all messages the decoder may have produced. - // If IO handler has unplugged engine, flush transient IO handler. - if (unlikely (!plugged)) { - zmq_assert (leftover_session); - leftover_session->flush (); - } else { - session->flush (); - } + session->flush (); // Input error has occurred. If the last decoded // message has already been accepted, we terminate // the engine immediately. Otherwise, we stop // waiting for input events and postpone the termination // until after the session has accepted the message. - if (session != NULL && disconnection) { + if (disconnection) { input_error = true; if (decoder.stalled ()) reset_pollin (handle); @@ -228,13 +215,6 @@ void zmq::stream_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); - // If IO handler has unplugged engine, flush transient IO handler. - if (unlikely (!plugged)) { - zmq_assert (leftover_session); - leftover_session->flush (); - return; - } - // If there is no data to send, stop polling for output. if (outsize == 0) { reset_pollout (handle); From a8f9a0d8918c6ede1617beda9c65ff70b5a1977a Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 15:31:59 +0200 Subject: [PATCH 3/7] Use zmq_assert rather then assert --- src/devpoll.cpp | 4 ++-- src/poll.cpp | 4 ++-- src/router.cpp | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/devpoll.cpp b/src/devpoll.cpp index 0c46d14b..97c1ec04 100644 --- a/src/devpoll.cpp +++ b/src/devpoll.cpp @@ -70,7 +70,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, } } - assert (!fd_table [fd_].valid); + zmq_assert (!fd_table [fd_].valid); fd_table [fd_].events = 0; fd_table [fd_].reactor = reactor_; @@ -88,7 +88,7 @@ zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, void zmq::devpoll_t::rm_fd (handle_t handle_) { - assert (fd_table [handle_].valid); + zmq_assert (fd_table [handle_].valid); devpoll_ctl (handle_, POLLREMOVE); fd_table [handle_].valid = false; diff --git a/src/poll.cpp b/src/poll.cpp index 1d1c4233..de7e0da3 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -57,7 +57,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) pollfd pfd = {fd_, 0, 0}; pollset.push_back (pfd); - assert (fd_table [fd_].index == retired_fd); + zmq_assert (fd_table [fd_].index == retired_fd); fd_table [fd_].index = pollset.size() - 1; fd_table [fd_].events = events_; @@ -71,7 +71,7 @@ zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) void zmq::poll_t::rm_fd (handle_t handle_) { fd_t index = fd_table [handle_].index; - assert (index != retired_fd); + zmq_assert (index != retired_fd); // Mark the fd as unused. pollset [index].fd = retired_fd; diff --git a/src/router.cpp b/src/router.cpp index c8877280..3a45de6f 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -219,8 +219,8 @@ int zmq::router_t::xrecv (msg_t *msg_, int flags_) } // Identity is not expected - assert ((msg_->flags () & msg_t::identity) == 0); - assert (pipe != NULL); + zmq_assert ((msg_->flags () & msg_t::identity) == 0); + zmq_assert (pipe != NULL); // If we are in the middle of reading a message, just return the next part. if (more_in) @@ -273,7 +273,7 @@ bool zmq::router_t::xhas_in () return false; // Identity is not expected - assert ((prefetched_msg.flags () & msg_t::identity) == 0); + zmq_assert ((prefetched_msg.flags () & msg_t::identity) == 0); blob_t identity = pipe->get_identity (); rc = prefetched_id.init_size (identity.size ()); From 13ef1e4f268452a076205d9c974a33e111401ef2 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 16:10:19 +0200 Subject: [PATCH 4/7] Make wsa_error_to_errno pure function --- src/err.cpp | 38 ++++++++++++++------------------------ src/err.hpp | 2 +- src/tcp_connecter.cpp | 4 ++-- src/tcp_listener.cpp | 8 ++++---- src/zmq.cpp | 12 +++++------- 5 files changed, 26 insertions(+), 38 deletions(-) diff --git a/src/err.cpp b/src/err.cpp index 8a25dbc7..eab242fa 100644 --- a/src/err.cpp +++ b/src/err.cpp @@ -214,46 +214,36 @@ void zmq::win_error (char *buffer_, size_t buffer_size_) zmq_assert (rc); } -void zmq::wsa_error_to_errno () +int zmq::wsa_error_to_errno (int errcode) { - int errcode = WSAGetLastError (); switch (errcode) { case WSAEINPROGRESS: - errno = EAGAIN; - return; + return EAGAIN; case WSAEBADF: - errno = EBADF; - return; + return EBADF; case WSAEINVAL: - errno = EINVAL; - return; + return EINVAL; case WSAEMFILE: - errno = EMFILE; - return; + return EMFILE; case WSAEFAULT: - errno = EFAULT; - return; + return EFAULT; case WSAEPROTONOSUPPORT: - errno = EPROTONOSUPPORT; - return; + return EPROTONOSUPPORT; case WSAENOBUFS: - errno = ENOBUFS; - return; + return ENOBUFS; case WSAENETDOWN: - errno = ENETDOWN; - return; + return ENETDOWN; case WSAEADDRINUSE: - errno = EADDRINUSE; - return; + return EADDRINUSE; case WSAEADDRNOTAVAIL: - errno = EADDRNOTAVAIL; - return; + return EADDRNOTAVAIL; case WSAEAFNOSUPPORT: - errno = EAFNOSUPPORT; - return; + return EAFNOSUPPORT; default: wsa_assert (false); } + // Not reachable + return 0; } #endif diff --git a/src/err.hpp b/src/err.hpp index cdae4e9d..b25efc06 100644 --- a/src/err.hpp +++ b/src/err.hpp @@ -57,7 +57,7 @@ namespace zmq const char *wsa_error (); const char *wsa_error_no (int no_); void win_error (char *buffer_, size_t buffer_size_); - void wsa_error_to_errno (); + int wsa_error_to_errno (int errcode); } // Provides convenient way to check WSA-style errors on Windows. diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index a2f96586..f74931a8 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -193,7 +193,7 @@ int zmq::tcp_connecter_t::open () s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); #ifdef ZMQ_HAVE_WINDOWS if (s == INVALID_SOCKET) { - wsa_error_to_errno (); + errno = wsa_error_to_errno (WSAGetLastError ()); return -1; } #else @@ -226,7 +226,7 @@ int zmq::tcp_connecter_t::open () errno = EINPROGRESS; return -1; } - wsa_error_to_errno (); + errno = wsa_error_to_errno (WSAGetLastError ()); #else if (rc == -1 && errno == EINTR) { errno = EINPROGRESS; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 338086d4..592bab5e 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -156,7 +156,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); #ifdef ZMQ_HAVE_WINDOWS if (s == INVALID_SOCKET) - wsa_error_to_errno (); + errno = wsa_error_to_errno (WSAGetLastError ()); #endif // IPv6 address family not supported, try automatic downgrade to IPv4. @@ -170,7 +170,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) #ifdef ZMQ_HAVE_WINDOWS if (s == INVALID_SOCKET) { - wsa_error_to_errno (); + errno = wsa_error_to_errno (WSAGetLastError ()); return -1; } // On Windows, preventing sockets to be inherited by child processes. @@ -203,7 +203,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) rc = bind (s, address.addr (), address.addrlen ()); #ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR) { - wsa_error_to_errno (); + errno = wsa_error_to_errno (WSAGetLastError ()); return -1; } #else @@ -215,7 +215,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_) rc = listen (s, options.backlog); #ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR) { - wsa_error_to_errno (); + errno = wsa_error_to_errno (WSAGetLastError ()); return -1; } #else diff --git a/src/zmq.cpp b/src/zmq.cpp index a9de5ecb..d7d6e04c 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -853,17 +853,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_HAVE_WINDOWS int rc = select (0, &inset, &outset, &errset, ptimeout); if (unlikely (rc == SOCKET_ERROR)) { - zmq::wsa_error_to_errno (); - if (errno == ENOTSOCK) - return -1; - wsa_assert (false); + errno = zmq::wsa_error_to_errno (WSAGetLastError ()); + wsa_assert (errno == ENOTSOCK); + return -1; } #else int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); if (unlikely (rc == -1)) { - if (errno == EINTR || errno == EBADF) - return -1; - errno_assert (false); + errno_assert (errno == EINTR || errno == EBADF); + return -1; } #endif break; From e0534643fa65890145aabfe6423695b63b1d9bb4 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 23:35:32 +0200 Subject: [PATCH 5/7] Simplify error handling in tcp_connecter --- src/tcp_connecter.cpp | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index f74931a8..504268a1 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -218,20 +218,17 @@ int zmq::tcp_connecter_t::open () if (rc == 0) return 0; - // Translate other error codes indicating asynchronous connect has been + // Translate error codes indicating asynchronous connect has been // launched to a uniform EINPROGRESS. #ifdef ZMQ_HAVE_WINDOWS - if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || - WSAGetLastError () == WSAEWOULDBLOCK)) { + const int error_code = WSAGetLastError (); + if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK) errno = EINPROGRESS; - return -1; - } - errno = wsa_error_to_errno (WSAGetLastError ()); + else + errno = wsa_error_to_errno (error_code); #else - if (rc == -1 && errno == EINTR) { + if (errno == EINTR) errno = EINPROGRESS; - return -1; - } #endif return -1; } From ac53f1a7284b85849e0a31fdec80e0f51293f3d0 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 22:51:56 +0200 Subject: [PATCH 6/7] Remove unnecessary casts --- src/pgm_socket.cpp | 2 +- src/stream_engine.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 0274ee4e..82dee0d3 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -454,7 +454,7 @@ size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_) // We have to write all data as one packet. if (nbytes > 0) { zmq_assert (status == PGM_IO_STATUS_NORMAL); - zmq_assert ((ssize_t) nbytes == (ssize_t) data_len_); + zmq_assert (nbytes == data_len_); } else { zmq_assert (status == PGM_IO_STATUS_RATE_LIMITED || status == PGM_IO_STATUS_WOULD_BLOCK); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 8142ce5d..ea52fe32 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -302,7 +302,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_) return -1; wsa_assert (nbytes != SOCKET_ERROR); - return (size_t) nbytes; + return nbytes; #else @@ -354,7 +354,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) if (nbytes == 0) return -1; - return (size_t) nbytes; + return nbytes; #else From d0b9005ef002fce2de2fbbfa5d29fe2bef871cb5 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 27 May 2012 23:26:14 +0200 Subject: [PATCH 7/7] Don't assume SOCKET_ERROR is -1 on Windows --- src/stream_engine.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index ea52fe32..fa372c93 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -292,7 +292,7 @@ int zmq::stream_engine_t::write (const void *data_, size_t size_) return 0; // Signalise peer failure. - if (nbytes == -1 && ( + if (nbytes == SOCKET_ERROR && ( WSAGetLastError () == WSAENETDOWN || WSAGetLastError () == WSAENETRESET || WSAGetLastError () == WSAEHOSTUNREACH || @@ -338,7 +338,7 @@ int zmq::stream_engine_t::read (void *data_, size_t size_) return 0; // Connection failure. - if (nbytes == -1 && ( + if (nbytes == SOCKET_ERROR && ( WSAGetLastError () == WSAENETDOWN || WSAGetLastError () == WSAENETRESET || WSAGetLastError () == WSAECONNABORTED ||