0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Problem: norm fails to compile under windows (#4123)

* Makes norm useable (but maybe slow) on windows
This commit is contained in:
mjvankampen 2021-01-18 09:42:14 +01:00 committed by GitHub
parent 53104ec1b9
commit 2dd24d6d80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 219 additions and 7 deletions

View File

@ -513,7 +513,8 @@ test_apps = \
tests/test_reconnect_ivl \ tests/test_reconnect_ivl \
tests/test_mock_pub_sub \ tests/test_mock_pub_sub \
tests/test_socket_null \ tests/test_socket_null \
tests/test_tcp_accept_filter tests/test_tcp_accept_filter \
tests/test_pubsub
UNITY_CPPFLAGS = -I$(top_srcdir)/external/unity -DUNITY_USE_COMMAND_LINE_ARGS -DUNITY_EXCLUDE_FLOAT UNITY_CPPFLAGS = -I$(top_srcdir)/external/unity -DUNITY_USE_COMMAND_LINE_ARGS -DUNITY_EXCLUDE_FLOAT
UNITY_LIBS = $(top_builddir)/external/unity/libunity.a UNITY_LIBS = $(top_builddir)/external/unity/libunity.a
@ -828,6 +829,10 @@ tests_test_tcp_accept_filter_SOURCES = tests/test_tcp_accept_filter.cpp
tests_test_tcp_accept_filter_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_tcp_accept_filter_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_tcp_accept_filter_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_tcp_accept_filter_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_pubsub_SOURCES = tests/test_pubsub.cpp
tests_test_pubsub_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_pubsub_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
if HAVE_CURVE if HAVE_CURVE
test_apps += \ test_apps += \

View File

@ -6,7 +6,7 @@ Mozilla Public License v2 (MPLv2) or any other Open Source Initiative
approved license chosen by the current ZeroMQ BDFL (Benevolent approved license chosen by the current ZeroMQ BDFL (Benevolent
Dictator for Life). Dictator for Life).
A portion of the commits made by the Github handle "mjvk", with A portion of the commits made by the Github handle "mjvk" and "mjvankampen", with
commit author "Mark Jan van Kampen <markjanvankampen@gmail.com>", are commit author "Mark Jan van Kampen <markjanvankampen@gmail.com>", are
copyright of Mark Jan van Kampen. This document hereby grants the libzmq copyright of Mark Jan van Kampen. This document hereby grants the libzmq
project team to relicense libzmq, including all past, present and project team to relicense libzmq, including all past, present and

View File

@ -901,6 +901,17 @@
'dependencies': [ 'dependencies': [
'libzmq' 'libzmq'
], ],
},
{
'target_name': 'test_pubsub',
'type': 'executable',
'sources': [
'../../tests/test_pubsub.cpp',
'../../tests/testutil.hpp'
],
'dependencies': [
'libzmq'
],
} }
] ]
} }

View File

@ -81,4 +81,5 @@
<test name = "test_filter_ipc" /> <test name = "test_filter_ipc" />
<test name = "test_fork" /> <test name = "test_fork" />
<test name = "test_abstract_ipc" /> <test name = "test_abstract_ipc" />
<test name = "test_pubsub" />
</tests> </tests>

View File

@ -6,9 +6,26 @@
#if defined ZMQ_HAVE_NORM #if defined ZMQ_HAVE_NORM
#include "norm_engine.hpp" #include "norm_engine.hpp"
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
#include "ip.hpp"
#endif
#include "session_base.hpp" #include "session_base.hpp"
#include "v2_protocol.hpp" #include "v2_protocol.hpp"
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
struct norm_wrapper_thread_args_t
{
NormDescriptor norm_descriptor;
SOCKET wrapper_write_fd;
NormInstanceHandle norm_instance_handle;
};
DWORD WINAPI normWrapperThread (LPVOID lpParam);
#endif
zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_, zmq::norm_engine_t::norm_engine_t (io_thread_t *parent_,
const options_t &options_) : const options_t &options_) :
io_object_t (parent_), io_object_t (parent_),
@ -219,6 +236,18 @@ void zmq::norm_engine_t::shutdown ()
void zmq::norm_engine_t::plug (io_thread_t *io_thread_, void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
session_base_t *session_) session_base_t *session_)
{ {
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
norm_wrapper_thread_args_t *threadArgs = new norm_wrapper_thread_args_t;
int rc = make_fdpair (&wrapper_read_fd, &threadArgs->wrapper_write_fd);
threadArgs->norm_descriptor = NormGetDescriptor (norm_instance);
threadArgs->norm_instance_handle = norm_instance;
norm_descriptor_handle = add_fd (wrapper_read_fd);
#else
fd_t normDescriptor = NormGetDescriptor (norm_instance);
norm_descriptor_handle = add_fd (normDescriptor);
#endif
// Set POLLIN for notification of pending NormEvents
set_pollin (norm_descriptor_handle);
// TBD - we may assign the NORM engine to an io_thread in the future??? // TBD - we may assign the NORM engine to an io_thread in the future???
zmq_session = session_; zmq_session = session_;
if (is_sender) if (is_sender)
@ -226,20 +255,30 @@ void zmq::norm_engine_t::plug (io_thread_t *io_thread_,
if (is_receiver) if (is_receiver)
zmq_input_ready = true; zmq_input_ready = true;
fd_t normDescriptor = NormGetDescriptor (norm_instance);
norm_descriptor_handle = add_fd (normDescriptor);
// Set POLLIN for notification of pending NormEvents
set_pollin (norm_descriptor_handle);
if (is_sender) if (is_sender)
send_data (); send_data ();
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
wrapper_thread_handle = CreateThread (NULL, 0, normWrapperThread,
threadArgs, 0, &wrapper_thread_id);
#endif
} // end zmq::norm_engine_t::init() } // end zmq::norm_engine_t::init()
void zmq::norm_engine_t::unplug () void zmq::norm_engine_t::unplug ()
{ {
rm_fd (norm_descriptor_handle); rm_fd (norm_descriptor_handle);
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
PostThreadMessage (wrapper_thread_id, WM_QUIT, (WPARAM) NULL,
(LPARAM) NULL);
WaitForSingleObject (wrapper_thread_handle, INFINITE);
DWORD exitCode;
GetExitCodeThread (wrapper_thread_handle, &exitCode);
zmq_assert (exitCode != -1);
int rc = closesocket (wrapper_read_fd);
errno_assert (rc != -1);
#endif
zmq_session = NULL; zmq_session = NULL;
} // end zmq::norm_engine_t::unplug() } // end zmq::norm_engine_t::unplug()
@ -329,11 +368,17 @@ void zmq::norm_engine_t::in_event ()
{ {
// This means a NormEvent is pending, so call NormGetNextEvent() and handle // This means a NormEvent is pending, so call NormGetNextEvent() and handle
NormEvent event; NormEvent event;
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
int rc = recv (wrapper_read_fd, reinterpret_cast<char *> (&event),
sizeof (event), 0);
errno_assert (rc == sizeof (event));
#else
if (!NormGetNextEvent (norm_instance, &event)) { if (!NormGetNextEvent (norm_instance, &event)) {
// NORM has died before we unplugged?! // NORM has died before we unplugged?!
zmq_assert (false); zmq_assert (false);
return; return;
} }
#endif
switch (event.type) { switch (event.type) {
case NORM_TX_QUEUE_VACANCY: case NORM_TX_QUEUE_VACANCY:
@ -718,4 +763,60 @@ const zmq::endpoint_uri_pair_t &zmq::norm_engine_t::get_endpoint () const
return _empty_endpoint; return _empty_endpoint;
} }
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
#include <iostream>
DWORD WINAPI normWrapperThread (LPVOID lpParam)
{
norm_wrapper_thread_args_t *norm_wrapper_thread_args =
(norm_wrapper_thread_args_t *) lpParam;
NormEvent message;
DWORD waitRc;
DWORD exitCode = 0;
int rc;
for (;;) {
// wait for norm event or message
waitRc = MsgWaitForMultipleObjectsEx (
1, &norm_wrapper_thread_args->norm_descriptor, INFINITE,
QS_ALLPOSTMESSAGE, 0);
// Check if norm event
if (waitRc == WAIT_OBJECT_0) {
// Process norm event
if (!NormGetNextEvent (
norm_wrapper_thread_args->norm_instance_handle, &message)) {
exitCode = -1;
break;
}
rc =
send (norm_wrapper_thread_args->wrapper_write_fd,
reinterpret_cast<char *> (&message), sizeof (message), 0);
errno_assert (rc != -1);
// Check if message
} else if (waitRc == WAIT_OBJECT_0 + 1) {
// Exit if WM_QUIT is received otherwise do nothing
MSG message;
GetMessage (&message, 0, 0, 0);
if (message.message == WM_QUIT) {
break;
} else {
// do nothing
}
// Otherwise an error occurred
} else {
exitCode = -1;
break;
}
}
// Free resources
rc = closesocket (norm_wrapper_thread_args->wrapper_write_fd);
free (norm_wrapper_thread_args);
errno_assert (rc != -1);
return exitCode;
}
#endif
#endif // ZMQ_HAVE_NORM #endif // ZMQ_HAVE_NORM

View File

@ -4,6 +4,11 @@
#if defined ZMQ_HAVE_NORM #if defined ZMQ_HAVE_NORM
#if defined(ZMQ_HAVE_WINDOWS) && defined(ZMQ_IOTHREAD_POLLER_USE_EPOLL)
#define ZMQ_USE_NORM_SOCKET_WRAPPER
#endif
#include "io_object.hpp" #include "io_object.hpp"
#include "i_engine.hpp" #include "i_engine.hpp"
#include "options.hpp" #include "options.hpp"
@ -186,6 +191,12 @@ class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
NormRxStreamState::List NormRxStreamState::List
msg_ready_list; // rx streams w/ msg ready for push to zmq msg_ready_list; // rx streams w/ msg ready for push to zmq
#ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
fd_t
wrapper_read_fd; // filedescriptor used to read norm events through the wrapper
DWORD wrapper_thread_id;
HANDLE wrapper_thread_handle;
#endif
}; // end class norm_engine_t }; // end class norm_engine_t
} }

View File

@ -24,6 +24,7 @@ set(tests
test_term_endpoint test_term_endpoint
test_router_mandatory test_router_mandatory
test_probe_router test_probe_router
test_pubsub
test_stream test_stream
test_stream_empty test_stream_empty
test_stream_disconnect test_stream_disconnect

82
tests/test_pubsub.cpp Normal file
View File

@ -0,0 +1,82 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
void test (const char *address)
{
// Create a publisher
void *publisher = test_context_socket (ZMQ_PUB);
char my_endpoint[MAX_SOCKET_STRING];
// Bind publisher
test_bind (publisher, address, my_endpoint, MAX_SOCKET_STRING);
// Create a subscriber
void *subscriber = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint));
// Subscribe to all messages.
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "", 0));
// Wait a bit till the subscription gets to the publisher
msleep (SETTLE_TIME);
// Send an empty message
send_string_expect_success (publisher, "test", 0);
// Receive the message in the subscriber
recv_string_expect_success (subscriber, "test", 0);
// Clean up.
test_context_socket_close (publisher);
test_context_socket_close (subscriber);
}
void test_norm ()
{
#if defined ZMQ_HAVE_NORM
test ("norm://224.1.2.3:5556");
#else
TEST_IGNORE_MESSAGE ("libzmq without NORM, ignoring test");
#endif
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_norm);
return UNITY_END ();
}