From aec47b364225c347a7edca778185f407308c2e83 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Thu, 8 Nov 2012 14:31:26 +0100 Subject: [PATCH] Extend ZMQ_ROUTER_RAW test Add test when the zeromq socket connects to a TCP socket. The test now fails due to bug in the zeromq library. --- tests/test_raw_sock.cpp | 103 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/tests/test_raw_sock.cpp b/tests/test_raw_sock.cpp index 8243b1ea..d9c48fa6 100644 --- a/tests/test_raw_sock.cpp +++ b/tests/test_raw_sock.cpp @@ -32,6 +32,7 @@ #include #include #include +#include //ToDo: Windows? const char *test_str = "TEST-STRING"; @@ -63,6 +64,42 @@ int tcp_client () return sockfd; } +int tcp_server () +{ + int listenfd = socket (AF_INET, SOCK_STREAM, 0); + assert (listenfd != -1); + + int flag = 1; + int rc = setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag); + assert (rc == 0); + + struct sockaddr_in serv_addr; + bzero (&serv_addr, sizeof serv_addr); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = htonl (INADDR_ANY); + serv_addr.sin_port = htons (5555); + + rc = bind (listenfd, (struct sockaddr *) &serv_addr, sizeof serv_addr); + assert (rc == 0); + + rc = listen (listenfd, 8); + assert (rc == 0); + + int sockfd = accept (listenfd, NULL, NULL); + assert (sockfd != -1); + + rc = close (listenfd); + assert (rc == 0); + + int flags = fcntl (sockfd, F_GETFL, 0); + if (flags == -1) + flags = 0; + rc = fcntl (sockfd, F_SETFL, flags | O_NONBLOCK); + assert (rc != -1); + + return sockfd; +} + void tcp_client_write (int sockfd, const void *buf, int buf_len) { assert (buf); @@ -90,11 +127,75 @@ void tcp_client_read (int sockfd) assert (memcmp (buffer, test_str, strlen (test_str)) == 0); } +size_t tcp_read (int s, char *buf, size_t bufsize) +{ + size_t bytes_read = 0; + + struct pollfd pfd = {s, POLLIN}; + int rc = poll (&pfd, 1, 100); + + while (rc > 0 && bytes_read < bufsize) { + int n = read (s, buf + bytes_read, bufsize - bytes_read); + if (n <= 0) + return bytes_read; + bytes_read += n; + rc = poll (&pfd, 1, 100); + } + + return bytes_read; +} + void tcp_client_close (int sockfd) { close (sockfd); } +void test_zmq_connect () +{ + void *ctx = zmq_init (1); + assert (ctx); + + void *zs = zmq_socket (ctx, ZMQ_ROUTER); + assert (zs); + + int rc = zmq_setsockopt (zs, ZMQ_IDENTITY, "X", 1); + assert (rc == 0); + + int raw_sock = 1; + rc = zmq_setsockopt (zs, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock); + assert (rc == 0); + + rc = zmq_connect (zs, "tcp://127.0.0.1:5555"); + assert (rc == 0); + + int i; + for (i = 0; i < 8; i++) { + int server_fd = tcp_server (); + assert (server_fd != -1); + + zmq_msg_t msg; + rc = zmq_msg_init_size (&msg, strlen (test_str)); + assert (rc == 0); + memcpy (zmq_msg_data (&msg), test_str, strlen (test_str)); + rc = zmq_msg_send (&msg, zs, 0); + + char buffer [128]; + size_t bytes_read = tcp_read (server_fd, buffer, sizeof buffer); + + assert (bytes_read == strlen (test_str) + || memcmp (buffer, test_str, bytes_read) == 0); + + rc = close (server_fd); + assert (rc == 0); + } + + rc = zmq_close (zs); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); +} + int main () { fprintf (stderr, "test_raw_sock running...\n"); @@ -148,6 +249,8 @@ int main () zmq_close (sb); zmq_term (ctx); + test_zmq_connect (); + fprintf (stderr, "test_raw_sock PASSED.\n"); return 0;