From a22714dd79021279ce64e7795be7f25ede8c1f83 Mon Sep 17 00:00:00 2001 From: Attila Mark Date: Tue, 26 Feb 2013 12:10:27 -0800 Subject: [PATCH] Improvements and fixes for performance of PUSH/PULL in local_thr and remote_thr. - option to set number of zmq threads on the command line for local_thr - option to set number of zmq_threads and workers in remote_thr - option to set SND/RCV buffer sizes on command line - option to set whether to PUSH/PULL on command line - option to set to use zmq_recv or zmq_msg for transfer on command line - better timing function - corrected and improved throughput reporting - HWM and DELAY socket options --- AUTHORS | 1 + perf/local_thr.cpp | 269 +++++++++++++++++++++++++++++++-------- perf/remote_thr.cpp | 297 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 473 insertions(+), 94 deletions(-) diff --git a/AUTHORS b/AUTHORS index c900ae6c..55d55614 100644 --- a/AUTHORS +++ b/AUTHORS @@ -5,6 +5,7 @@ AJ Lewis Alexej Lotz Andrew Thompson Asko Kauppi +Attila Mark Barak Amar Ben Gray Bernd Prager diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index d379f473..8d183a9b 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -19,28 +19,115 @@ along with this program. If not, see . */ -#include "../include/zmq.h" -#include "../include/zmq_utils.h" +#include "zmq.h" +#include "zmq_utils.h" #include #include +#include +#include +#include +#include + +#define ZMSG 1 +#define DATA 0 + +typedef struct US_TIMER US_TIMER; + +struct US_TIMER{ + + struct timeval time_was; + struct timeval time_now; +}; +/* Records the current timer state +*/ +void tm_init( US_TIMER *t){ + + if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");} + + t->time_was = t->time_now; + +} + +/* Returns the time passed in microsecond precision in seconds since last init + of timer. +*/ +float tm_secs( US_TIMER *t){ + + register float seconds; + + if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");} + + seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) + + (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0)); + + t->time_was = t->time_now; + + return( seconds); +} + +const char *bind_to; +int message_count = 1000; +int message_size = 1024; +int threads = 1; +int workers = 1; +int sndbuflen = 128*256; +int rcvbuflen = 128*256; +int flow = ZMQ_PULL; +int rec = DATA; + +void my_free (void *data, void *hint) +{ + // free (data); +} int main (int argc, char *argv []) { - if (argc != 4) { - printf ("usage: local_thr \n"); + US_TIMER timer; + void *ctx; + void *s; + int rc; + int i; + void *buf = NULL; + + if (argc != 9) { + printf ("usage: local_thr \n"); return 1; } - const char *bind_to = argv [1]; - size_t message_size = atoi (argv [2]); - int message_count = atoi (argv [3]); - void *ctx = zmq_init (1); + bind_to = argv [1]; + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); + sndbuflen = atoi (argv [4]); + rcvbuflen = atoi (argv [5]); + if( !strcmp( argv [6], "PUSH")){ + flow = ZMQ_PUSH; + } + if( !strcmp( argv [6], "PULL")){ + flow = ZMQ_PULL; + } + if( !strcmp( argv [7], "ZMSG")){ + rec = ZMSG; + } + if( !strcmp( argv [7], "DATA")){ + rec = DATA; + } + threads = atoi (argv [8]); + + if( !(buf = malloc( message_size))){ perror("malloc"); return -1;} + + ctx = zmq_ctx_new (); if (!ctx) { - printf ("error in zmq_init: %s\n", zmq_strerror (errno)); + printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno)); return -1; } - void *s = zmq_socket (ctx, ZMQ_PULL); + rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads); + if (rc) { + printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno)); + return -1; + } + + s = zmq_socket (ctx, flow); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); return -1; @@ -49,61 +136,141 @@ int main (int argc, char *argv []) // Add your socket options here. // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. - int rc = zmq_bind (s, bind_to); + size_t rcvbuflenlen = (size_t)sizeof rcvbuflen; + size_t sndbuflenlen = (size_t)sizeof sndbuflen; + + rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + + sndbuflen = 1; + rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + + sndbuflen = 2; + rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + + sndbuflen = 2; + rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + + + rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); + if (rc != 0) { + printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); + return -1; + } + + printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024); + + printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS)); + + rc = zmq_bind (s, bind_to); if (rc != 0) { printf ("error in zmq_bind: %s\n", zmq_strerror (errno)); return -1; } - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - if (rc != 0) { - printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); - return -1; - } + if( !(buf = malloc( message_size))){ perror("malloc"); return -1;} - rc = zmq_recvmsg (s, &msg, 0); - if (rc < 0) { - printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); - return -1; - } - if (zmq_msg_size (&msg) != message_size) { - printf ("message of incorrect size received\n"); - return -1; - } + printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA"); - void *watch = zmq_stopwatch_start (); + tm_init( &timer); - int i; - for (i = 0; i != message_count - 1; i++) { - rc = zmq_recvmsg (s, &msg, 0); - if (rc < 0) { - printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); + if( flow == ZMQ_PULL){ + + if( rec == ZMSG){ + + zmq_msg_t msg; + + rc = zmq_msg_init (&msg); + if (rc != 0) { + printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); return -1; } - if (zmq_msg_size (&msg) != message_size) { - printf ("message of incorrect size received\n"); - return -1; + for (i = 0; i != message_count; i++) { + rc = zmq_msg_recv (&msg, s, 0); + if (rc < 0) { + printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + return -1; + } } + rc = zmq_msg_close (&msg); + if (rc != 0) { + printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); + return -1; + }} + + else{ + + for (i = 0; i != message_count; i++) { + rc = zmq_recv( s, buf, message_size, 0); + if (rc < 0) { + printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + return -1; + } + }} + } + else{ + + if( rec == ZMSG){ + + zmq_msg_t msg; + + for (i = 0; i != message_count; i++) { + + rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL); + if (rc != 0) { + printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno)); + return -1; + } + + rc = zmq_msg_send( &msg, s, 0); + if (rc < 0) { + printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + return -1; + } + }} + + + else{ + for (i = 0; i != message_count; i++){ + rc = zmq_send( s, buf, message_size, 0); + if (rc < 0) { + printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + return -1; + } + }} } - unsigned long elapsed = zmq_stopwatch_stop (watch); - if (elapsed == 0) - elapsed = 1; + float secs = tm_secs( &timer); + float total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0); - rc = zmq_msg_close (&msg); - if (rc != 0) { - printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); - return -1; - } + printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs); + printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs); - double throughput = ((double) message_count / (double) elapsed) * 1000000; - double megabits = (double) (throughput * message_size * 8) / 1000000; - - printf ("message size: %d [B]\n", (int) message_size); - printf ("message count: %d\n", (int) message_count); - printf ("mean throughput: %d [msg/s]\n", (int) throughput); - printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); rc = zmq_close (s); if (rc != 0) { @@ -117,5 +284,7 @@ int main (int argc, char *argv []) return -1; } + if( buf) free( buf); + return 0; } diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index 328bdcea..2f98bd4e 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -19,82 +19,290 @@ along with this program. If not, see . */ -#include "../include/zmq.h" -#include "../include/zmq_utils.h" +#include "zmq.h" +#include "zmq_utils.h" #include #include +#include #include +#include +#include +#include +#include -int main (int argc, char *argv []) +#define ZMSG 1 +#define DATA 0 + +const char *connect_to; +int message_count = 1000; +int message_size = 1024; +int threads = 1; +int workers = 1; +int sndbuflen = 128*256; +int rcvbuflen = 128*256; +int flow = ZMQ_PUSH; +int rec = DATA; + +typedef struct US_TIMER US_TIMER; + +struct US_TIMER{ + + struct timeval time_was; + struct timeval time_now; +}; +/* Records the current timer state +*/ +void tm_init( US_TIMER *t){ + + if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");} + + t->time_was = t->time_now; + +} + +/* Returns the time passed in microsecond precision in seconds since last init + of timer. +*/ +float tm_secs( US_TIMER *t){ + + register float seconds; + + if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");} + + seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) + + (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0)); + + t->time_was = t->time_now; + + return( seconds); +} + +void my_free (void *data, void *hint) { - const char *connect_to; - int message_count; - int message_size; - void *ctx; - void *s; - int rc; - int i; - zmq_msg_t msg; + //free (data); +} - if (argc != 4) { - printf ("usage: remote_thr " - "\n"); - return 1; - } - connect_to = argv [1]; - message_size = atoi (argv [2]); - message_count = atoi (argv [3]); +static void *worker_routine (void *ctx) { - ctx = zmq_init (1); - if (!ctx) { - printf ("error in zmq_init: %s\n", zmq_strerror (errno)); - return -1; - } + int rc,i; + void *buf = NULL; + + if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;} + + void *s = zmq_socket (ctx, flow); - s = zmq_socket (ctx, ZMQ_PUSH); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); - return -1; + return NULL; } // Add your socket options here. // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. + size_t rcvbuflenlen = (size_t)sizeof rcvbuflen; + size_t sndbuflenlen = (size_t)sizeof sndbuflen; + + rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + + sndbuflen = 1; + rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + + sndbuflen = 2; + rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + + sndbuflen = 2; + rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + + rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); + if (rc != 0) { + printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); + if (rc != 0) { + printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); + return NULL; + } + + printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024); + + rc = zmq_connect (s, connect_to); if (rc != 0) { printf ("error in zmq_connect: %s\n", zmq_strerror (errno)); - return -1; + return NULL; } - for (i = 0; i != message_count; i++) { + printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA"); - rc = zmq_msg_init_size (&msg, message_size); - if (rc != 0) { - printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno)); - return -1; - } -#if defined ZMQ_MAKE_VALGRIND_HAPPY - memset (zmq_msg_data (&msg), 0, message_size); -#endif + if( flow == ZMQ_PUSH){ + + if( rec == ZMSG){ + + zmq_msg_t msg; - rc = zmq_sendmsg (s, &msg, 0); - if (rc < 0) { - printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); - return -1; - } - rc = zmq_msg_close (&msg); + for (i = 0; i != message_count; i++) { + + rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL); + if (rc != 0) { + printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno)); + return NULL; + } + + rc = zmq_msg_send( &msg, s, 0); + if (rc < 0) { + printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + return NULL; + } + + rc = zmq_msg_close (&msg); + if (rc != 0) { + printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); + exit (1); + } + }} + + else{ + + for (i = 0; i != message_count; i++) { + rc = zmq_send( s, buf, message_size, 0); + if (rc < 0) { + printf ("error in zmq_send: %s\n", zmq_strerror (errno)); + return NULL; + } + }} + + } + else{ + + if( rec == ZMSG){ + + zmq_msg_t msg; + + rc = zmq_msg_init (&msg); if (rc != 0) { - printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); - return -1; + printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); + return NULL; } + for (i = 0; i != message_count; i++) { + rc = zmq_msg_recv (&msg, s, 0); + if (rc < 0) { + printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + return NULL; + } + }} + + else{ + + for (i = 0; i != message_count; i++) { + rc = zmq_recv( s, buf, message_size, 0); + if (rc < 0) { + printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); + return NULL; + } + }} } rc = zmq_close (s); if (rc != 0) { printf ("error in zmq_close: %s\n", zmq_strerror (errno)); + return NULL; + } + + free( buf); + + return NULL; +} + +int main (int argc, char *argv []) +{ + void *ctx; + int rc; + int i; + void *p; + + if (argc != 10) { + printf ("usage: remote_thr \n"); + return 1; + } + + connect_to = argv [1]; + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); + sndbuflen = atoi (argv [4]); + rcvbuflen = atoi (argv [5]); + if( !strcmp( argv [6], "PUSH")){ + flow = ZMQ_PUSH; + } + if( !strcmp( argv [6], "PULL")){ + flow = ZMQ_PULL; + } + if( !strcmp( argv [7], "ZMSG")){ + rec = ZMSG; + } + if( !strcmp( argv [7], "DATA")){ + rec = DATA; + } + threads = atoi (argv [8]); + workers = atoi (argv [9]); + + + ctx = zmq_ctx_new (); + if (!ctx) { + printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno)); return -1; } + rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads); + if (rc) { + printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno)); + return -1; + } + + printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers); + pthread_t worker[128]; + + US_TIMER timer; + + tm_init( &timer); + + for (i = 0; i < workers; i++) { + pthread_create (&worker[i], NULL, worker_routine, ctx); + printf("Worker %d spawned\n", i); + } + + for (i = 0; i < workers; i++) { + pthread_join( worker[i], &p); + printf("Worker %d joined\n", i); + } + + float secs = tm_secs( &timer); + float total = ( (float)workers)*(((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0); + + printf ("Message: size: %d KBytes, count: %d/workers(%d), time: %f secs\n", (int) message_size/1024, message_count, workers, secs); + printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs); + rc = zmq_term (ctx); if (rc != 0) { printf ("error in zmq_term: %s\n", zmq_strerror (errno)); @@ -103,3 +311,4 @@ int main (int argc, char *argv []) return 0; } +