diff --git a/.gitignore b/.gitignore index cc89fbfc..fee8796f 100644 --- a/.gitignore +++ b/.gitignore @@ -101,6 +101,10 @@ test_xpub_nodrop test_xpub_manual test_xpub_welcome_msg test_atomics +test_client_drop_more +test_client_server +test_server_drop_more +test_thread_safe tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/CMakeLists.txt b/CMakeLists.txt index d75a1f9d..eb49a1e0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -158,10 +158,6 @@ set(CMAKE_REQUIRED_INCLUDES ) add_definitions(-D_REENTRANT -D_THREAD_SAFE) -if(WIN32) - add_definitions(-DDLL_EXPORT) -endif() - option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD) macro(zmq_check_cxx_flag_prepend flag) @@ -342,14 +338,13 @@ set(LIBRARY_OUTPUT_PATH ${CMAKE_CURRENT_BINARY_DIR}/lib) #----------------------------------------------------------------------------- # platform specifics -if(MSVC) - add_definitions( - -DWIN32 - -DDLL_EXPORT - # NB: May require tweaking for highly connected applications. - -DFD_SETSIZE=4096 - -D_CRT_SECURE_NO_WARNINGS) +if (WIN32) + # NB: May require tweaking for highly connected applications. + add_definitions (-DFD_SETSIZE=4096) + add_definitions (-D_CRT_SECURE_NO_WARNINGS) +endif () +if(MSVC) # Parallel make. set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP") @@ -585,7 +580,8 @@ if(MSVC) PUBLIC_HEADER "${public_headers}" RELEASE_POSTFIX "${_zmq_COMPILER}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}" DEBUG_POSTFIX "${_zmq_COMPILER}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}" - RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/bin") + RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/bin" + COMPILE_DEFINITIONS "DLL_EXPORT") add_library(libzmq-static STATIC ${sources}) set_target_properties(libzmq-static PROPERTIES PUBLIC_HEADER "${public_headers}" @@ -595,16 +591,18 @@ if(MSVC) OUTPUT_NAME "libzmq") else() add_library(libzmq SHARED ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig}) + set_target_properties(libzmq PROPERTIES + COMPILE_DEFINITIONS "DLL_EXPORT" + PUBLIC_HEADER "${public_headers}" + VERSION ${ZMQ_VERSION} + SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0") if(ZMQ_BUILD_FRAMEWORK) set_target_properties(libzmq PROPERTIES FRAMEWORK TRUE OUTPUT_NAME "ZeroMQ" - PUBLIC_HEADER "${public_headers}" MACOSX_FRAMEWORK_IDENTIFIER "org.zeromq.libzmq" MACOSX_FRAMEWORK_SHORT_VERSION_STRING ${ZMQ_VERSION} - MACOSX_FRAMEWORK_BUNDLE_VERSION ${ZMQ_VERSION} - VERSION ${ZMQ_VERSION} - SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0") + MACOSX_FRAMEWORK_BUNDLE_VERSION ${ZMQ_VERSION}) set_source_files_properties(${html-docs} PROPERTIES MACOSX_PACKAGE_LOCATION doc) set_source_files_properties(${readme-docs} PROPERTIES @@ -613,13 +611,13 @@ else() MACOSX_PACKAGE_LOCATION lib/pkgconfig) else() set_target_properties(libzmq PROPERTIES - OUTPUT_NAME "zmq" - PUBLIC_HEADER "${public_headers}") + OUTPUT_NAME "zmq" + ) endif() add_library(libzmq-static STATIC ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig}) set_target_properties(libzmq-static PROPERTIES PUBLIC_HEADER "${public_headers}" - COMPILE_FLAGS "-DZMQ_STATIC" + COMPILE_DEFINITIONS "ZMQ_STATIC" OUTPUT_NAME "zmq-static") endif() @@ -721,11 +719,11 @@ endif() # DESTINATION include # COMPONENT SDK) -if(NOT ZMQ_BUILD_FRAMEWORK) - file(GLOB private_headers "${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp") - install(FILES ${sources} ${private_headers} DESTINATION src/zmq - COMPONENT SourceCode) -endif() +#if(NOT ZMQ_BUILD_FRAMEWORK) +# file(GLOB private_headers "${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp") +# install(FILES ${sources} ${private_headers} DESTINATION src/zmq +# COMPONENT SourceCode) +#endif() foreach(readme ${readme-docs}) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/${readme} ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt) @@ -734,7 +732,7 @@ foreach(readme ${readme-docs}) if(MSVC) install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION .) else() - install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION etc/zmq) + install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION share/zmq) endif() endif() endforeach() diff --git a/Makefile.am b/Makefile.am index 28eef125..e63749ad 100644 --- a/Makefile.am +++ b/Makefile.am @@ -353,7 +353,7 @@ test_apps = \ tests/test_atomics \ tests/test_client_server \ tests/test_server_drop_more \ - tests/test_client_drop_more \ + tests/test_client_drop_more \ tests/test_thread_safe tests_test_system_SOURCES = tests/test_system.cpp @@ -535,7 +535,7 @@ tests_test_server_drop_more_LDADD = src/libzmq.la tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp tests_test_client_drop_more_LDADD = src/libzmq.la -tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp +tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp tests_test_thread_safe_LDADD = src/libzmq.la diff --git a/builds/cmake/Modules/FindAsciiDoc.cmake b/builds/cmake/Modules/FindAsciiDoc.cmake index db4e418c..049ac007 100644 --- a/builds/cmake/Modules/FindAsciiDoc.cmake +++ b/builds/cmake/Modules/FindAsciiDoc.cmake @@ -6,19 +6,21 @@ # A2X_EXECUTABLE - the full path to a2x # A2X_FOUND - If false, don't attempt to use a2x. +set (PROGRAMFILESX86 "PROGRAMFILES(X86)") + find_program(ASCIIDOC_EXECUTABLE asciidoc asciidoc.py PATHS "$ENV{ASCIIDOC_ROOT}" "$ENV{PROGRAMW6432}/asciidoc" "$ENV{PROGRAMFILES}/asciidoc" - "$ENV{PROGRAMFILES(X86)}/asciidoc") + "$ENV{${PROGRAMFILESX86}}/asciidoc") find_program(A2X_EXECUTABLE a2x PATHS "$ENV{ASCIIDOC_ROOT}" "$ENV{PROGRAMW6432}/asciidoc" "$ENV{PROGRAMFILES}/asciidoc" - "$ENV{PROGRAMFILES(X86)}/asciidoc") + "$ENV{${PROGRAMFILESX86}}/asciidoc") include(FindPackageHandleStandardArgs) find_package_handle_standard_ARGS(AsciiDoc REQUIRED_VARS ASCIIDOC_EXECUTABLE) -mark_as_advanced(ASCIIDOC_EXECUTABLE A2X_EXECUTABLE) \ No newline at end of file +mark_as_advanced(ASCIIDOC_EXECUTABLE A2X_EXECUTABLE) diff --git a/builds/msvc/properties/ReleaseDEXE.props b/builds/msvc/properties/ReleaseDEXE.props index b89b373a..73deeae9 100644 --- a/builds/msvc/properties/ReleaseDEXE.props +++ b/builds/msvc/properties/ReleaseDEXE.props @@ -13,7 +13,7 @@ - MultiThreadedDebugDLL + MultiThreadedDLL diff --git a/doc/zmq_ctx_term.txt b/doc/zmq_ctx_term.txt index 42c5eb08..79aad1c5 100644 --- a/doc/zmq_ctx_term.txt +++ b/doc/zmq_ctx_term.txt @@ -4,7 +4,7 @@ zmq_ctx_term(3) NAME ---- -zmq_ctx_term - destroy a 0MQ context +zmq_ctx_term - terminate a 0MQ context SYNOPSIS @@ -36,7 +36,8 @@ Context termination is performed in the following steps: For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_ option in linkzmq:zmq_setsockopt[3]. -This function replaces the deprecated function linkzmq:zmq_term[3]. +This function replaces the deprecated functions linkzmq:zmq_term[3] and +linkzmq:zmq_ctx_destroy[3]. RETURN VALUE diff --git a/doc/zmq_msg_init.txt b/doc/zmq_msg_init.txt index 76ce38b8..27ea4b2a 100644 --- a/doc/zmq_msg_init.txt +++ b/doc/zmq_msg_init.txt @@ -28,8 +28,7 @@ _zmq_msg_init_size()_ are mutually exclusive. Never initialise the same RETURN VALUE ------------ -The _zmq_msg_init()_ function shall return zero if successful. Otherwise it -shall return `-1` and set 'errno' to one of the values defined below. +The _zmq_msg_init()_ function always returns zero. ERRORS diff --git a/doc/zmq_unbind.txt b/doc/zmq_unbind.txt index 545afd07..84c9f99e 100644 --- a/doc/zmq_unbind.txt +++ b/doc/zmq_unbind.txt @@ -40,6 +40,8 @@ The endpoint supplied is invalid. The 0MQ 'context' associated with the specified 'socket' was terminated. *ENOTSOCK*:: The provided 'socket' was invalid. +*ENOENT*:: +The endpoint supplied was not previously bound. EXAMPLES diff --git a/src/pipe.cpp b/src/pipe.cpp index 13e00c9f..a58a6f2d 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -267,6 +267,8 @@ void zmq::pipe_t::process_hiccup (void *pipe_) outpipe->flush (); msg_t msg; while (outpipe->read (&msg)) { + if (!(msg.flags () & msg_t::more)) + msgs_written--; int rc = msg.close (); errno_assert (rc == 0); } diff --git a/src/stream.cpp b/src/stream.cpp index be53d9de..8166fe22 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -176,7 +176,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_, return 0; } break; - + case ZMQ_STREAM_NOTIFY: if (is_int && (value == 0 || value == 1)) { options.raw_notify = (value != 0); @@ -221,6 +221,12 @@ int zmq::stream_t::xrecv (msg_t *msg_) blob_t identity = pipe->get_identity (); rc = msg_->init_size (identity.size ()); errno_assert (rc == 0); + + // forward metadata (if any) + metadata_t *metadata = prefetched_msg.metadata(); + if (metadata) + msg_->set_metadata(metadata); + memcpy (msg_->data (), identity.data (), identity.size ()); msg_->set_flags (msg_t::more); @@ -249,6 +255,12 @@ bool zmq::stream_t::xhas_in () blob_t identity = pipe->get_identity (); rc = prefetched_id.init_size (identity.size ()); errno_assert (rc == 0); + + // forward metadata (if any) + metadata_t *metadata = prefetched_msg.metadata(); + if (metadata) + prefetched_id.set_metadata(metadata); + memcpy (prefetched_id.data (), identity.data (), identity.size ()); prefetched_id.set_flags (msg_t::more); @@ -277,7 +289,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) connect_rid.length ()); connect_rid.clear (); outpipes_t::iterator it = outpipes.find (identity); - if (it != outpipes.end ()) + if (it != outpipes.end ()) zmq_assert(false); } else { diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 82f5ad0a..7f322b38 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -36,7 +36,6 @@ #include #include #include -#include #include "stream_engine.hpp" #include "io_thread.hpp" @@ -192,14 +191,21 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, handshaking = false; next_msg = &stream_engine_t::pull_msg_from_session; - process_msg = &stream_engine_t::push_msg_to_session; + process_msg = &stream_engine_t::push_raw_msg_to_session; + + properties_t properties; + if (init_properties(properties)) { + // Compile metadata. + zmq_assert (metadata == NULL); + metadata = new (std::nothrow) metadata_t (properties); + } if (options.raw_notify) { // For raw sockets, send an initial 0-length message to the // application so that it knows a peer has connected. msg_t connector; connector.init(); - push_msg_to_session (&connector); + push_raw_msg_to_session (&connector); connector.close(); session->flush (); } @@ -804,13 +810,8 @@ void zmq::stream_engine_t::mechanism_ready () process_msg = &stream_engine_t::write_credential; // Compile metadata. - typedef metadata_t::dict_t properties_t; properties_t properties; - - // If we have a peer_address, add it to metadata - if (!peer_address.empty()) { - properties.insert(std::make_pair("Peer-Address", peer_address)); - } + init_properties(properties); // Add ZAP properties. const properties_t& zap_properties = mechanism->get_zap_properties (); @@ -835,6 +836,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_) return session->push_msg (msg_); } +int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) { + if (metadata) + msg_->set_metadata(metadata); + return push_msg_to_session(msg_); +} + int zmq::stream_engine_t::write_credential (msg_t *msg_) { zmq_assert (mechanism != NULL); @@ -938,6 +945,12 @@ void zmq::stream_engine_t::set_handshake_timer () } } +bool zmq::stream_engine_t::init_properties (properties_t & properties) { + if (peer_address.empty()) return false; + properties.insert (std::make_pair("Peer-Address", peer_address)); + return true; +} + void zmq::stream_engine_t::timer_event (int id_) { zmq_assert (id_ == handshake_timer_id); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 140f13a6..df91357f 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -59,7 +59,7 @@ namespace zmq timeout_error }; - stream_engine_t (fd_t fd_, const options_t &options_, + stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint); ~stream_engine_t (); @@ -77,7 +77,6 @@ namespace zmq void timer_event (int id_); private: - // Unplug the engine from the session. void unplug (); @@ -99,6 +98,8 @@ namespace zmq int pull_msg_from_session (msg_t *msg_); int push_msg_to_session (msg_t *msg); + int push_raw_msg_to_session (msg_t *msg); + int write_credential (msg_t *msg_); int pull_and_encode (msg_t *msg_); int decode_and_push (msg_t *msg_); @@ -113,6 +114,9 @@ namespace zmq void set_handshake_timer(); + typedef metadata_t::dict_t properties_t; + bool init_properties (properties_t & properties); + // Underlying socket. fd_t s; diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp index 20a03ca5..dad2f9a3 100644 --- a/tests/test_stream.cpp +++ b/tests/test_stream.cpp @@ -80,17 +80,32 @@ test_stream_to_dealer (void) assert (rc > 0); assert (zmq_msg_more (&identity)); + // Verify the existence of Peer-Address metadata + char const* peer_address = zmq_msg_gets (&identity, "Peer-Address"); + assert (peer_address != 0); + assert (streq (peer_address, "127.0.0.1")); + // Second frame is zero byte buffer [255]; rc = zmq_recv (stream, buffer, 255, 0); assert (rc == 0); - + + // Verify the existence of Peer-Address metadata + peer_address = zmq_msg_gets (&identity, "Peer-Address"); + assert (peer_address != 0); + assert (streq (peer_address, "127.0.0.1")); + // Real data follows // First frame is identity rc = zmq_msg_recv (&identity, stream, 0); assert (rc > 0); assert (zmq_msg_more (&identity)); + // Verify the existence of Peer-Address metadata + peer_address = zmq_msg_gets (&identity, "Peer-Address"); + assert (peer_address != 0); + assert (streq (peer_address, "127.0.0.1")); + // Second frame is greeting signature rc = zmq_recv (stream, buffer, 255, 0); assert (rc == 10); @@ -182,7 +197,7 @@ test_stream_to_stream (void) // Set-up our context and sockets void *ctx = zmq_ctx_new (); assert (ctx); - + void *server = zmq_socket (ctx, ZMQ_STREAM); assert (server); int enabled = 1; @@ -200,7 +215,7 @@ test_stream_to_stream (void) uint8_t id [256]; size_t id_size = 256; uint8_t buffer [256]; - + // Connecting sends a zero message // Server: First frame is identity, second frame is zero id_size = zmq_recv (server, id, 256, 0); @@ -223,19 +238,19 @@ test_stream_to_stream (void) // Second frame is HTTP GET request rc = zmq_send (client, "GET /\n\n", 7, 0); assert (rc == 7); - + // Get HTTP request; ID frame and then request id_size = zmq_recv (server, id, 256, 0); assert (id_size > 0); rc = zmq_recv (server, buffer, 256, 0); assert (rc != -1); assert (memcmp (buffer, "GET /\n\n", 7) == 0); - + // Send reply back to client char http_response [] = - "HTTP/1.0 200 OK\r\n" - "Content-Type: text/plain\r\n" - "\r\n" + "HTTP/1.0 200 OK\r\n" + "Content-Type: text/plain\r\n" + "\r\n" "Hello, World!"; rc = zmq_send (server, id, id_size, ZMQ_SNDMORE); assert (rc != -1);