diff --git a/src/router.cpp b/src/router.cpp index 77526e4e..49c01f10 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -401,6 +401,8 @@ bool zmq::router_t::xhas_in () errno_assert (rc == 0); memcpy (_prefetched_id.data (), routing_id.data (), routing_id.size ()); _prefetched_id.set_flags (msg_t::more); + if (_prefetched_msg.metadata ()) + _prefetched_id.set_metadata (_prefetched_msg.metadata ()); _prefetched = true; _routing_id_sent = false; diff --git a/tests/test_metadata.cpp b/tests/test_metadata.cpp index 3ae3643e..642e8d6e 100644 --- a/tests/test_metadata.cpp +++ b/tests/test_metadata.cpp @@ -127,10 +127,61 @@ void test_metadata () zmq_threadclose (zap_thread); } +void test_router_prefetch_metadata () +{ + char my_endpoint[MAX_SOCKET_STRING]; + setup_test_context (); + + // Spawn ZAP handler + // We create and bind ZAP socket in main thread to avoid case + // where child thread does not start up fast enough. + void *handler = zmq_socket (get_test_context (), ZMQ_REP); + TEST_ASSERT_NOT_NULL (handler); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (handler, "inproc://zeromq.zap.01")); + void *zap_thread = zmq_threadstart (&zap_handler, handler); + + void *server = test_context_socket (ZMQ_ROUTER); + void *client = test_context_socket (ZMQ_REQ); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (server, ZMQ_ZAP_DOMAIN, "DOMAIN", 6)); + bind_loopback_ipv4 (server, my_endpoint, sizeof (my_endpoint)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, my_endpoint)); + + send_string_expect_success (client, "This is a message", 0); + + msleep (SETTLE_TIME); + + // Check for messages in the ROUTER socket which will trigger a prefetch + unsigned long int dummy; + size_t dummy_size = sizeof (dummy); + zmq_getsockopt (server, ZMQ_EVENTS, &dummy, &dummy_size); + + zmq_msg_t msg; + + // Ensure all frames in the message contain metadata + for (int i = 0; i < 3; i++) { + zmq_msg_init (&msg); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, server, 0)); + TEST_ASSERT_EQUAL_STRING ("World", zmq_msg_gets (&msg, "Hello")); + zmq_msg_close (&msg); + } + + test_context_socket_close_zero_linger (client); + test_context_socket_close_zero_linger (server); + + // Shutdown + teardown_test_context (); + + // Wait until ZAP handler terminates + zmq_threadclose (zap_thread); +} + + int main () { setup_test_environment (); UNITY_BEGIN (); RUN_TEST (test_metadata); + RUN_TEST (test_router_prefetch_metadata); return UNITY_END (); }