mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 07:56:09 +00:00
Merge branch 'master' of github.com:zeromq/libzmq
This commit is contained in:
commit
41d85f52d2
4
.gitignore
vendored
4
.gitignore
vendored
@ -101,6 +101,10 @@ test_xpub_nodrop
|
|||||||
test_xpub_manual
|
test_xpub_manual
|
||||||
test_xpub_welcome_msg
|
test_xpub_welcome_msg
|
||||||
test_atomics
|
test_atomics
|
||||||
|
test_client_drop_more
|
||||||
|
test_client_server
|
||||||
|
test_server_drop_more
|
||||||
|
test_thread_safe
|
||||||
tests/test*.log
|
tests/test*.log
|
||||||
tests/test*.trs
|
tests/test*.trs
|
||||||
src/platform.hpp*
|
src/platform.hpp*
|
||||||
|
@ -158,10 +158,6 @@ set(CMAKE_REQUIRED_INCLUDES )
|
|||||||
|
|
||||||
add_definitions(-D_REENTRANT -D_THREAD_SAFE)
|
add_definitions(-D_REENTRANT -D_THREAD_SAFE)
|
||||||
|
|
||||||
if(WIN32)
|
|
||||||
add_definitions(-DDLL_EXPORT)
|
|
||||||
endif()
|
|
||||||
|
|
||||||
option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD)
|
option(ENABLE_EVENTFD "Enable/disable eventfd" ZMQ_HAVE_EVENTFD)
|
||||||
|
|
||||||
macro(zmq_check_cxx_flag_prepend flag)
|
macro(zmq_check_cxx_flag_prepend flag)
|
||||||
@ -342,14 +338,13 @@ set(LIBRARY_OUTPUT_PATH ${CMAKE_CURRENT_BINARY_DIR}/lib)
|
|||||||
#-----------------------------------------------------------------------------
|
#-----------------------------------------------------------------------------
|
||||||
# platform specifics
|
# platform specifics
|
||||||
|
|
||||||
if(MSVC)
|
if (WIN32)
|
||||||
add_definitions(
|
# NB: May require tweaking for highly connected applications.
|
||||||
-DWIN32
|
add_definitions (-DFD_SETSIZE=4096)
|
||||||
-DDLL_EXPORT
|
add_definitions (-D_CRT_SECURE_NO_WARNINGS)
|
||||||
# NB: May require tweaking for highly connected applications.
|
endif ()
|
||||||
-DFD_SETSIZE=4096
|
|
||||||
-D_CRT_SECURE_NO_WARNINGS)
|
|
||||||
|
|
||||||
|
if(MSVC)
|
||||||
# Parallel make.
|
# Parallel make.
|
||||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP")
|
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /MP")
|
||||||
|
|
||||||
@ -585,7 +580,8 @@ if(MSVC)
|
|||||||
PUBLIC_HEADER "${public_headers}"
|
PUBLIC_HEADER "${public_headers}"
|
||||||
RELEASE_POSTFIX "${_zmq_COMPILER}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
|
RELEASE_POSTFIX "${_zmq_COMPILER}-mt-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
|
||||||
DEBUG_POSTFIX "${_zmq_COMPILER}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
|
DEBUG_POSTFIX "${_zmq_COMPILER}-mt-gd-${ZMQ_VERSION_MAJOR}_${ZMQ_VERSION_MINOR}_${ZMQ_VERSION_PATCH}"
|
||||||
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/bin")
|
RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/bin"
|
||||||
|
COMPILE_DEFINITIONS "DLL_EXPORT")
|
||||||
add_library(libzmq-static STATIC ${sources})
|
add_library(libzmq-static STATIC ${sources})
|
||||||
set_target_properties(libzmq-static PROPERTIES
|
set_target_properties(libzmq-static PROPERTIES
|
||||||
PUBLIC_HEADER "${public_headers}"
|
PUBLIC_HEADER "${public_headers}"
|
||||||
@ -595,16 +591,18 @@ if(MSVC)
|
|||||||
OUTPUT_NAME "libzmq")
|
OUTPUT_NAME "libzmq")
|
||||||
else()
|
else()
|
||||||
add_library(libzmq SHARED ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
|
add_library(libzmq SHARED ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
|
||||||
|
set_target_properties(libzmq PROPERTIES
|
||||||
|
COMPILE_DEFINITIONS "DLL_EXPORT"
|
||||||
|
PUBLIC_HEADER "${public_headers}"
|
||||||
|
VERSION ${ZMQ_VERSION}
|
||||||
|
SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0")
|
||||||
if(ZMQ_BUILD_FRAMEWORK)
|
if(ZMQ_BUILD_FRAMEWORK)
|
||||||
set_target_properties(libzmq PROPERTIES
|
set_target_properties(libzmq PROPERTIES
|
||||||
FRAMEWORK TRUE
|
FRAMEWORK TRUE
|
||||||
OUTPUT_NAME "ZeroMQ"
|
OUTPUT_NAME "ZeroMQ"
|
||||||
PUBLIC_HEADER "${public_headers}"
|
|
||||||
MACOSX_FRAMEWORK_IDENTIFIER "org.zeromq.libzmq"
|
MACOSX_FRAMEWORK_IDENTIFIER "org.zeromq.libzmq"
|
||||||
MACOSX_FRAMEWORK_SHORT_VERSION_STRING ${ZMQ_VERSION}
|
MACOSX_FRAMEWORK_SHORT_VERSION_STRING ${ZMQ_VERSION}
|
||||||
MACOSX_FRAMEWORK_BUNDLE_VERSION ${ZMQ_VERSION}
|
MACOSX_FRAMEWORK_BUNDLE_VERSION ${ZMQ_VERSION})
|
||||||
VERSION ${ZMQ_VERSION}
|
|
||||||
SOVERSION "${ZMQ_VERSION_MAJOR}.${ZMQ_VERSION_MINOR}.0")
|
|
||||||
set_source_files_properties(${html-docs} PROPERTIES
|
set_source_files_properties(${html-docs} PROPERTIES
|
||||||
MACOSX_PACKAGE_LOCATION doc)
|
MACOSX_PACKAGE_LOCATION doc)
|
||||||
set_source_files_properties(${readme-docs} PROPERTIES
|
set_source_files_properties(${readme-docs} PROPERTIES
|
||||||
@ -613,13 +611,13 @@ else()
|
|||||||
MACOSX_PACKAGE_LOCATION lib/pkgconfig)
|
MACOSX_PACKAGE_LOCATION lib/pkgconfig)
|
||||||
else()
|
else()
|
||||||
set_target_properties(libzmq PROPERTIES
|
set_target_properties(libzmq PROPERTIES
|
||||||
OUTPUT_NAME "zmq"
|
OUTPUT_NAME "zmq"
|
||||||
PUBLIC_HEADER "${public_headers}")
|
)
|
||||||
endif()
|
endif()
|
||||||
add_library(libzmq-static STATIC ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
|
add_library(libzmq-static STATIC ${sources} ${public_headers} ${html-docs} ${readme-docs} ${zmq-pkgconfig})
|
||||||
set_target_properties(libzmq-static PROPERTIES
|
set_target_properties(libzmq-static PROPERTIES
|
||||||
PUBLIC_HEADER "${public_headers}"
|
PUBLIC_HEADER "${public_headers}"
|
||||||
COMPILE_FLAGS "-DZMQ_STATIC"
|
COMPILE_DEFINITIONS "ZMQ_STATIC"
|
||||||
OUTPUT_NAME "zmq-static")
|
OUTPUT_NAME "zmq-static")
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
@ -721,11 +719,11 @@ endif()
|
|||||||
# DESTINATION include
|
# DESTINATION include
|
||||||
# COMPONENT SDK)
|
# COMPONENT SDK)
|
||||||
|
|
||||||
if(NOT ZMQ_BUILD_FRAMEWORK)
|
#if(NOT ZMQ_BUILD_FRAMEWORK)
|
||||||
file(GLOB private_headers "${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp")
|
# file(GLOB private_headers "${CMAKE_CURRENT_SOURCE_DIR}/src/*.hpp")
|
||||||
install(FILES ${sources} ${private_headers} DESTINATION src/zmq
|
# install(FILES ${sources} ${private_headers} DESTINATION src/zmq
|
||||||
COMPONENT SourceCode)
|
# COMPONENT SourceCode)
|
||||||
endif()
|
#endif()
|
||||||
|
|
||||||
foreach(readme ${readme-docs})
|
foreach(readme ${readme-docs})
|
||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/${readme} ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt)
|
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/${readme} ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt)
|
||||||
@ -734,7 +732,7 @@ foreach(readme ${readme-docs})
|
|||||||
if(MSVC)
|
if(MSVC)
|
||||||
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION .)
|
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION .)
|
||||||
else()
|
else()
|
||||||
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION etc/zmq)
|
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/${readme}.txt DESTINATION share/zmq)
|
||||||
endif()
|
endif()
|
||||||
endif()
|
endif()
|
||||||
endforeach()
|
endforeach()
|
||||||
|
@ -353,7 +353,7 @@ test_apps = \
|
|||||||
tests/test_atomics \
|
tests/test_atomics \
|
||||||
tests/test_client_server \
|
tests/test_client_server \
|
||||||
tests/test_server_drop_more \
|
tests/test_server_drop_more \
|
||||||
tests/test_client_drop_more \
|
tests/test_client_drop_more \
|
||||||
tests/test_thread_safe
|
tests/test_thread_safe
|
||||||
|
|
||||||
tests_test_system_SOURCES = tests/test_system.cpp
|
tests_test_system_SOURCES = tests/test_system.cpp
|
||||||
@ -535,7 +535,7 @@ tests_test_server_drop_more_LDADD = src/libzmq.la
|
|||||||
tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp
|
tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp
|
||||||
tests_test_client_drop_more_LDADD = src/libzmq.la
|
tests_test_client_drop_more_LDADD = src/libzmq.la
|
||||||
|
|
||||||
tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp
|
tests_test_thread_safe_SOURCES = tests/test_thread_safe.cpp
|
||||||
tests_test_thread_safe_LDADD = src/libzmq.la
|
tests_test_thread_safe_LDADD = src/libzmq.la
|
||||||
|
|
||||||
|
|
||||||
|
@ -6,19 +6,21 @@
|
|||||||
# A2X_EXECUTABLE - the full path to a2x
|
# A2X_EXECUTABLE - the full path to a2x
|
||||||
# A2X_FOUND - If false, don't attempt to use a2x.
|
# A2X_FOUND - If false, don't attempt to use a2x.
|
||||||
|
|
||||||
|
set (PROGRAMFILESX86 "PROGRAMFILES(X86)")
|
||||||
|
|
||||||
find_program(ASCIIDOC_EXECUTABLE asciidoc asciidoc.py
|
find_program(ASCIIDOC_EXECUTABLE asciidoc asciidoc.py
|
||||||
PATHS "$ENV{ASCIIDOC_ROOT}"
|
PATHS "$ENV{ASCIIDOC_ROOT}"
|
||||||
"$ENV{PROGRAMW6432}/asciidoc"
|
"$ENV{PROGRAMW6432}/asciidoc"
|
||||||
"$ENV{PROGRAMFILES}/asciidoc"
|
"$ENV{PROGRAMFILES}/asciidoc"
|
||||||
"$ENV{PROGRAMFILES(X86)}/asciidoc")
|
"$ENV{${PROGRAMFILESX86}}/asciidoc")
|
||||||
|
|
||||||
find_program(A2X_EXECUTABLE a2x
|
find_program(A2X_EXECUTABLE a2x
|
||||||
PATHS "$ENV{ASCIIDOC_ROOT}"
|
PATHS "$ENV{ASCIIDOC_ROOT}"
|
||||||
"$ENV{PROGRAMW6432}/asciidoc"
|
"$ENV{PROGRAMW6432}/asciidoc"
|
||||||
"$ENV{PROGRAMFILES}/asciidoc"
|
"$ENV{PROGRAMFILES}/asciidoc"
|
||||||
"$ENV{PROGRAMFILES(X86)}/asciidoc")
|
"$ENV{${PROGRAMFILESX86}}/asciidoc")
|
||||||
|
|
||||||
|
|
||||||
include(FindPackageHandleStandardArgs)
|
include(FindPackageHandleStandardArgs)
|
||||||
find_package_handle_standard_ARGS(AsciiDoc REQUIRED_VARS ASCIIDOC_EXECUTABLE)
|
find_package_handle_standard_ARGS(AsciiDoc REQUIRED_VARS ASCIIDOC_EXECUTABLE)
|
||||||
mark_as_advanced(ASCIIDOC_EXECUTABLE A2X_EXECUTABLE)
|
mark_as_advanced(ASCIIDOC_EXECUTABLE A2X_EXECUTABLE)
|
||||||
|
@ -13,7 +13,7 @@
|
|||||||
|
|
||||||
<ItemDefinitionGroup>
|
<ItemDefinitionGroup>
|
||||||
<ClCompile>
|
<ClCompile>
|
||||||
<RuntimeLibrary>MultiThreadedDebugDLL</RuntimeLibrary>
|
<RuntimeLibrary>MultiThreadedDLL</RuntimeLibrary>
|
||||||
</ClCompile>
|
</ClCompile>
|
||||||
</ItemDefinitionGroup>
|
</ItemDefinitionGroup>
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ zmq_ctx_term(3)
|
|||||||
|
|
||||||
NAME
|
NAME
|
||||||
----
|
----
|
||||||
zmq_ctx_term - destroy a 0MQ context
|
zmq_ctx_term - terminate a 0MQ context
|
||||||
|
|
||||||
|
|
||||||
SYNOPSIS
|
SYNOPSIS
|
||||||
@ -36,7 +36,8 @@ Context termination is performed in the following steps:
|
|||||||
For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_
|
For further details regarding socket linger behaviour refer to the _ZMQ_LINGER_
|
||||||
option in linkzmq:zmq_setsockopt[3].
|
option in linkzmq:zmq_setsockopt[3].
|
||||||
|
|
||||||
This function replaces the deprecated function linkzmq:zmq_term[3].
|
This function replaces the deprecated functions linkzmq:zmq_term[3] and
|
||||||
|
linkzmq:zmq_ctx_destroy[3].
|
||||||
|
|
||||||
|
|
||||||
RETURN VALUE
|
RETURN VALUE
|
||||||
|
@ -28,8 +28,7 @@ _zmq_msg_init_size()_ are mutually exclusive. Never initialise the same
|
|||||||
|
|
||||||
RETURN VALUE
|
RETURN VALUE
|
||||||
------------
|
------------
|
||||||
The _zmq_msg_init()_ function shall return zero if successful. Otherwise it
|
The _zmq_msg_init()_ function always returns zero.
|
||||||
shall return `-1` and set 'errno' to one of the values defined below.
|
|
||||||
|
|
||||||
|
|
||||||
ERRORS
|
ERRORS
|
||||||
|
@ -40,6 +40,8 @@ The endpoint supplied is invalid.
|
|||||||
The 0MQ 'context' associated with the specified 'socket' was terminated.
|
The 0MQ 'context' associated with the specified 'socket' was terminated.
|
||||||
*ENOTSOCK*::
|
*ENOTSOCK*::
|
||||||
The provided 'socket' was invalid.
|
The provided 'socket' was invalid.
|
||||||
|
*ENOENT*::
|
||||||
|
The endpoint supplied was not previously bound.
|
||||||
|
|
||||||
|
|
||||||
EXAMPLES
|
EXAMPLES
|
||||||
|
@ -267,6 +267,8 @@ void zmq::pipe_t::process_hiccup (void *pipe_)
|
|||||||
outpipe->flush ();
|
outpipe->flush ();
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
while (outpipe->read (&msg)) {
|
while (outpipe->read (&msg)) {
|
||||||
|
if (!(msg.flags () & msg_t::more))
|
||||||
|
msgs_written--;
|
||||||
int rc = msg.close ();
|
int rc = msg.close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
@ -176,7 +176,7 @@ int zmq::stream_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case ZMQ_STREAM_NOTIFY:
|
case ZMQ_STREAM_NOTIFY:
|
||||||
if (is_int && (value == 0 || value == 1)) {
|
if (is_int && (value == 0 || value == 1)) {
|
||||||
options.raw_notify = (value != 0);
|
options.raw_notify = (value != 0);
|
||||||
@ -221,6 +221,12 @@ int zmq::stream_t::xrecv (msg_t *msg_)
|
|||||||
blob_t identity = pipe->get_identity ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = msg_->init_size (identity.size ());
|
rc = msg_->init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// forward metadata (if any)
|
||||||
|
metadata_t *metadata = prefetched_msg.metadata();
|
||||||
|
if (metadata)
|
||||||
|
msg_->set_metadata(metadata);
|
||||||
|
|
||||||
memcpy (msg_->data (), identity.data (), identity.size ());
|
memcpy (msg_->data (), identity.data (), identity.size ());
|
||||||
msg_->set_flags (msg_t::more);
|
msg_->set_flags (msg_t::more);
|
||||||
|
|
||||||
@ -249,6 +255,12 @@ bool zmq::stream_t::xhas_in ()
|
|||||||
blob_t identity = pipe->get_identity ();
|
blob_t identity = pipe->get_identity ();
|
||||||
rc = prefetched_id.init_size (identity.size ());
|
rc = prefetched_id.init_size (identity.size ());
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// forward metadata (if any)
|
||||||
|
metadata_t *metadata = prefetched_msg.metadata();
|
||||||
|
if (metadata)
|
||||||
|
prefetched_id.set_metadata(metadata);
|
||||||
|
|
||||||
memcpy (prefetched_id.data (), identity.data (), identity.size ());
|
memcpy (prefetched_id.data (), identity.data (), identity.size ());
|
||||||
prefetched_id.set_flags (msg_t::more);
|
prefetched_id.set_flags (msg_t::more);
|
||||||
|
|
||||||
@ -277,7 +289,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_)
|
|||||||
connect_rid.length ());
|
connect_rid.length ());
|
||||||
connect_rid.clear ();
|
connect_rid.clear ();
|
||||||
outpipes_t::iterator it = outpipes.find (identity);
|
outpipes_t::iterator it = outpipes.find (identity);
|
||||||
if (it != outpipes.end ())
|
if (it != outpipes.end ())
|
||||||
zmq_assert(false);
|
zmq_assert(false);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -36,7 +36,6 @@
|
|||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <new>
|
#include <new>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
#include <iostream>
|
|
||||||
|
|
||||||
#include "stream_engine.hpp"
|
#include "stream_engine.hpp"
|
||||||
#include "io_thread.hpp"
|
#include "io_thread.hpp"
|
||||||
@ -192,14 +191,21 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
|||||||
handshaking = false;
|
handshaking = false;
|
||||||
|
|
||||||
next_msg = &stream_engine_t::pull_msg_from_session;
|
next_msg = &stream_engine_t::pull_msg_from_session;
|
||||||
process_msg = &stream_engine_t::push_msg_to_session;
|
process_msg = &stream_engine_t::push_raw_msg_to_session;
|
||||||
|
|
||||||
|
properties_t properties;
|
||||||
|
if (init_properties(properties)) {
|
||||||
|
// Compile metadata.
|
||||||
|
zmq_assert (metadata == NULL);
|
||||||
|
metadata = new (std::nothrow) metadata_t (properties);
|
||||||
|
}
|
||||||
|
|
||||||
if (options.raw_notify) {
|
if (options.raw_notify) {
|
||||||
// For raw sockets, send an initial 0-length message to the
|
// For raw sockets, send an initial 0-length message to the
|
||||||
// application so that it knows a peer has connected.
|
// application so that it knows a peer has connected.
|
||||||
msg_t connector;
|
msg_t connector;
|
||||||
connector.init();
|
connector.init();
|
||||||
push_msg_to_session (&connector);
|
push_raw_msg_to_session (&connector);
|
||||||
connector.close();
|
connector.close();
|
||||||
session->flush ();
|
session->flush ();
|
||||||
}
|
}
|
||||||
@ -804,13 +810,8 @@ void zmq::stream_engine_t::mechanism_ready ()
|
|||||||
process_msg = &stream_engine_t::write_credential;
|
process_msg = &stream_engine_t::write_credential;
|
||||||
|
|
||||||
// Compile metadata.
|
// Compile metadata.
|
||||||
typedef metadata_t::dict_t properties_t;
|
|
||||||
properties_t properties;
|
properties_t properties;
|
||||||
|
init_properties(properties);
|
||||||
// If we have a peer_address, add it to metadata
|
|
||||||
if (!peer_address.empty()) {
|
|
||||||
properties.insert(std::make_pair("Peer-Address", peer_address));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add ZAP properties.
|
// Add ZAP properties.
|
||||||
const properties_t& zap_properties = mechanism->get_zap_properties ();
|
const properties_t& zap_properties = mechanism->get_zap_properties ();
|
||||||
@ -835,6 +836,12 @@ int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_)
|
|||||||
return session->push_msg (msg_);
|
return session->push_msg (msg_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::stream_engine_t::push_raw_msg_to_session (msg_t *msg_) {
|
||||||
|
if (metadata)
|
||||||
|
msg_->set_metadata(metadata);
|
||||||
|
return push_msg_to_session(msg_);
|
||||||
|
}
|
||||||
|
|
||||||
int zmq::stream_engine_t::write_credential (msg_t *msg_)
|
int zmq::stream_engine_t::write_credential (msg_t *msg_)
|
||||||
{
|
{
|
||||||
zmq_assert (mechanism != NULL);
|
zmq_assert (mechanism != NULL);
|
||||||
@ -938,6 +945,12 @@ void zmq::stream_engine_t::set_handshake_timer ()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool zmq::stream_engine_t::init_properties (properties_t & properties) {
|
||||||
|
if (peer_address.empty()) return false;
|
||||||
|
properties.insert (std::make_pair("Peer-Address", peer_address));
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::stream_engine_t::timer_event (int id_)
|
void zmq::stream_engine_t::timer_event (int id_)
|
||||||
{
|
{
|
||||||
zmq_assert (id_ == handshake_timer_id);
|
zmq_assert (id_ == handshake_timer_id);
|
||||||
|
@ -59,7 +59,7 @@ namespace zmq
|
|||||||
timeout_error
|
timeout_error
|
||||||
};
|
};
|
||||||
|
|
||||||
stream_engine_t (fd_t fd_, const options_t &options_,
|
stream_engine_t (fd_t fd_, const options_t &options_,
|
||||||
const std::string &endpoint);
|
const std::string &endpoint);
|
||||||
~stream_engine_t ();
|
~stream_engine_t ();
|
||||||
|
|
||||||
@ -77,7 +77,6 @@ namespace zmq
|
|||||||
void timer_event (int id_);
|
void timer_event (int id_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Unplug the engine from the session.
|
// Unplug the engine from the session.
|
||||||
void unplug ();
|
void unplug ();
|
||||||
|
|
||||||
@ -99,6 +98,8 @@ namespace zmq
|
|||||||
int pull_msg_from_session (msg_t *msg_);
|
int pull_msg_from_session (msg_t *msg_);
|
||||||
int push_msg_to_session (msg_t *msg);
|
int push_msg_to_session (msg_t *msg);
|
||||||
|
|
||||||
|
int push_raw_msg_to_session (msg_t *msg);
|
||||||
|
|
||||||
int write_credential (msg_t *msg_);
|
int write_credential (msg_t *msg_);
|
||||||
int pull_and_encode (msg_t *msg_);
|
int pull_and_encode (msg_t *msg_);
|
||||||
int decode_and_push (msg_t *msg_);
|
int decode_and_push (msg_t *msg_);
|
||||||
@ -113,6 +114,9 @@ namespace zmq
|
|||||||
|
|
||||||
void set_handshake_timer();
|
void set_handshake_timer();
|
||||||
|
|
||||||
|
typedef metadata_t::dict_t properties_t;
|
||||||
|
bool init_properties (properties_t & properties);
|
||||||
|
|
||||||
// Underlying socket.
|
// Underlying socket.
|
||||||
fd_t s;
|
fd_t s;
|
||||||
|
|
||||||
|
@ -80,17 +80,32 @@ test_stream_to_dealer (void)
|
|||||||
assert (rc > 0);
|
assert (rc > 0);
|
||||||
assert (zmq_msg_more (&identity));
|
assert (zmq_msg_more (&identity));
|
||||||
|
|
||||||
|
// Verify the existence of Peer-Address metadata
|
||||||
|
char const* peer_address = zmq_msg_gets (&identity, "Peer-Address");
|
||||||
|
assert (peer_address != 0);
|
||||||
|
assert (streq (peer_address, "127.0.0.1"));
|
||||||
|
|
||||||
// Second frame is zero
|
// Second frame is zero
|
||||||
byte buffer [255];
|
byte buffer [255];
|
||||||
rc = zmq_recv (stream, buffer, 255, 0);
|
rc = zmq_recv (stream, buffer, 255, 0);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Verify the existence of Peer-Address metadata
|
||||||
|
peer_address = zmq_msg_gets (&identity, "Peer-Address");
|
||||||
|
assert (peer_address != 0);
|
||||||
|
assert (streq (peer_address, "127.0.0.1"));
|
||||||
|
|
||||||
// Real data follows
|
// Real data follows
|
||||||
// First frame is identity
|
// First frame is identity
|
||||||
rc = zmq_msg_recv (&identity, stream, 0);
|
rc = zmq_msg_recv (&identity, stream, 0);
|
||||||
assert (rc > 0);
|
assert (rc > 0);
|
||||||
assert (zmq_msg_more (&identity));
|
assert (zmq_msg_more (&identity));
|
||||||
|
|
||||||
|
// Verify the existence of Peer-Address metadata
|
||||||
|
peer_address = zmq_msg_gets (&identity, "Peer-Address");
|
||||||
|
assert (peer_address != 0);
|
||||||
|
assert (streq (peer_address, "127.0.0.1"));
|
||||||
|
|
||||||
// Second frame is greeting signature
|
// Second frame is greeting signature
|
||||||
rc = zmq_recv (stream, buffer, 255, 0);
|
rc = zmq_recv (stream, buffer, 255, 0);
|
||||||
assert (rc == 10);
|
assert (rc == 10);
|
||||||
@ -182,7 +197,7 @@ test_stream_to_stream (void)
|
|||||||
// Set-up our context and sockets
|
// Set-up our context and sockets
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
|
|
||||||
void *server = zmq_socket (ctx, ZMQ_STREAM);
|
void *server = zmq_socket (ctx, ZMQ_STREAM);
|
||||||
assert (server);
|
assert (server);
|
||||||
int enabled = 1;
|
int enabled = 1;
|
||||||
@ -200,7 +215,7 @@ test_stream_to_stream (void)
|
|||||||
uint8_t id [256];
|
uint8_t id [256];
|
||||||
size_t id_size = 256;
|
size_t id_size = 256;
|
||||||
uint8_t buffer [256];
|
uint8_t buffer [256];
|
||||||
|
|
||||||
// Connecting sends a zero message
|
// Connecting sends a zero message
|
||||||
// Server: First frame is identity, second frame is zero
|
// Server: First frame is identity, second frame is zero
|
||||||
id_size = zmq_recv (server, id, 256, 0);
|
id_size = zmq_recv (server, id, 256, 0);
|
||||||
@ -223,19 +238,19 @@ test_stream_to_stream (void)
|
|||||||
// Second frame is HTTP GET request
|
// Second frame is HTTP GET request
|
||||||
rc = zmq_send (client, "GET /\n\n", 7, 0);
|
rc = zmq_send (client, "GET /\n\n", 7, 0);
|
||||||
assert (rc == 7);
|
assert (rc == 7);
|
||||||
|
|
||||||
// Get HTTP request; ID frame and then request
|
// Get HTTP request; ID frame and then request
|
||||||
id_size = zmq_recv (server, id, 256, 0);
|
id_size = zmq_recv (server, id, 256, 0);
|
||||||
assert (id_size > 0);
|
assert (id_size > 0);
|
||||||
rc = zmq_recv (server, buffer, 256, 0);
|
rc = zmq_recv (server, buffer, 256, 0);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
|
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
|
||||||
|
|
||||||
// Send reply back to client
|
// Send reply back to client
|
||||||
char http_response [] =
|
char http_response [] =
|
||||||
"HTTP/1.0 200 OK\r\n"
|
"HTTP/1.0 200 OK\r\n"
|
||||||
"Content-Type: text/plain\r\n"
|
"Content-Type: text/plain\r\n"
|
||||||
"\r\n"
|
"\r\n"
|
||||||
"Hello, World!";
|
"Hello, World!";
|
||||||
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
||||||
assert (rc != -1);
|
assert (rc != -1);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user