From c980820d29d483da05e203858bd1a38b718f334f Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Mon, 15 Apr 2013 17:18:03 +0200 Subject: [PATCH] I'm reverting the various changes to the throughput test programs since as far as I can see, these didn't work any more. At the very least, the command line API was broken and forced the user to enter new, exotic arguments. Patches should not break existing APIs. But also, the internals of these programs had become weird. If we want to build more complex performance tests, that's fine, but we should make new programs, not break the old ones. We need minimal, safe performance tests in 0MQ. Also, the code was quite horrid. So it's gone. If anyone wants to bring it back please make the code neat, and build new APIs instead of breaking the old ones. Cheers Pieter --- perf/local_thr.cpp | 307 ++++++++------------------------------ perf/remote_thr.cpp | 348 +++++--------------------------------------- 2 files changed, 98 insertions(+), 557 deletions(-) diff --git a/perf/local_thr.cpp b/perf/local_thr.cpp index 4d291957..ab7a15e0 100644 --- a/perf/local_thr.cpp +++ b/perf/local_thr.cpp @@ -1,7 +1,5 @@ /* - Copyright (c) 2007-2012 iMatix Corporation - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -23,143 +21,37 @@ #include "../include/zmq_utils.h" #include #include -#include -#include -#include - -#include "platform.hpp" - -#ifndef ZMQ_HAVE_WINDOWS -#include -#endif - -#define ZMSG 1 -#define DATA 0 - -typedef struct US_TIMER US_TIMER; - -struct US_TIMER{ - - struct timeval time_was; - struct timeval time_now; -}; -/* Records the current timer state -*/ -void tm_init( US_TIMER *t){ -#if defined ZMQ_HAVE_WINDOWS - - // Get the high resolution counter's accuracy. - LARGE_INTEGER ticksPerSecond; - QueryPerformanceFrequency (&ticksPerSecond); - - // What time is it? - LARGE_INTEGER tick; - if ( !QueryPerformanceCounter (&tick) ) { perror( "tm_init()" ); } - - // Seconds - t->time_now.tv_sec = (long)( tick.QuadPart / ticksPerSecond.QuadPart ); - // Microseconds - t->time_now.tv_usec = (long)( ( tick.QuadPart - t->time_now.tv_sec * ticksPerSecond.QuadPart ) * 1000000 / ticksPerSecond.QuadPart ); -#else - if( gettimeofday( &t->time_now, NULL) < 0){ perror( "tm_init()");} -#endif - t->time_was = t->time_now; - -} - -/* Returns the time passed in microsecond precision in seconds since last init - of timer. -*/ -float tm_secs( US_TIMER *t){ - - register float seconds; -#if defined ZMQ_HAVE_WINDOWS - // Get the high resolution counter's accuracy. - LARGE_INTEGER ticksPerSecond; - QueryPerformanceFrequency (&ticksPerSecond); - - // What time is it? - LARGE_INTEGER tick; - if ( !QueryPerformanceCounter (&tick) ) { perror( "tm_secs()" ); } - - // Seconds - t->time_now.tv_sec = (long)( tick.QuadPart / ticksPerSecond.QuadPart ); - // Microseconds - t->time_now.tv_usec = (long)( ( tick.QuadPart - t->time_now.tv_sec * ticksPerSecond.QuadPart ) * 1000000 / ticksPerSecond.QuadPart ); -#else - if( gettimeofday( &t->time_now, NULL) < 0){ perror( "tm_secs()");} -#endif - seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) + - (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0)); - - t->time_was = t->time_now; - - return( seconds); -} - -const char *bind_to; -int message_count = 1000; -int message_size = 1024; -int threads = 1; -int workers = 1; -int sndbuflen = 128*256; -int rcvbuflen = 128*256; -int flow = ZMQ_PULL; -int rec = DATA; - -void my_free (void *data, void *hint) -{ - // free (data); -} int main (int argc, char *argv []) { - US_TIMER timer; + const char *bind_to; + int message_count; + size_t message_size; void *ctx; void *s; int rc; int i; - void *buf = NULL; + zmq_msg_t msg; + void *watch; + unsigned long elapsed; + unsigned long throughput; + double megabits; - if (argc != 9) { - printf ("usage: local_thr \n"); + if (argc != 4) { + printf ("usage: local_thr \n"); return 1; } - bind_to = argv [1]; message_size = atoi (argv [2]); message_count = atoi (argv [3]); - sndbuflen = atoi (argv [4]); - rcvbuflen = atoi (argv [5]); - if( !strcmp( argv [6], "PUSH")){ - flow = ZMQ_PUSH; - } - if( !strcmp( argv [6], "PULL")){ - flow = ZMQ_PULL; - } - if( !strcmp( argv [7], "ZMSG")){ - rec = ZMSG; - } - if( !strcmp( argv [7], "DATA")){ - rec = DATA; - } - threads = atoi (argv [8]); - if( !(buf = malloc( message_size))){ perror("malloc"); return -1;} - - ctx = zmq_ctx_new (); + ctx = zmq_init (1); if (!ctx) { - printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno)); + printf ("error in zmq_init: %s\n", zmq_strerror (errno)); return -1; } - rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads); - if (rc) { - printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno)); - return -1; - } - - s = zmq_socket (ctx, flow); + s = zmq_socket (ctx, ZMQ_PULL); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); return -1; @@ -168,141 +60,60 @@ int main (int argc, char *argv []) // Add your socket options here. // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. - size_t rcvbuflenlen = (size_t)sizeof rcvbuflen; - size_t sndbuflenlen = (size_t)sizeof sndbuflen; - - rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - - sndbuflen = 1; - rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - - sndbuflen = 2; - rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - - sndbuflen = 2; - rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - - - rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); - if (rc != 0) { - printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); - return -1; - } - - printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024); - - printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS)); - rc = zmq_bind (s, bind_to); if (rc != 0) { printf ("error in zmq_bind: %s\n", zmq_strerror (errno)); return -1; } - if( !(buf = malloc( message_size))){ perror("malloc"); return -1;} - - printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA"); - - tm_init( &timer); - - if( flow == ZMQ_PULL){ - - if( rec == ZMSG){ - - zmq_msg_t msg; - - rc = zmq_msg_init (&msg); - if (rc != 0) { - printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); - return -1; - } - for (i = 0; i != message_count; i++) { - rc = zmq_msg_recv (&msg, s, 0); - if (rc < 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); - return -1; - } - } - rc = zmq_msg_close (&msg); - if (rc != 0) { - printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); - return -1; - }} - - else{ - - for (i = 0; i != message_count; i++) { - rc = zmq_recv( s, buf, message_size, 0); - if (rc < 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); - return -1; - } - }} - } - else{ - - if( rec == ZMSG){ - - zmq_msg_t msg; - - for (i = 0; i != message_count; i++) { - - rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL); - if (rc != 0) { - printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno)); - return -1; - } - - rc = zmq_msg_send( &msg, s, 0); - if (rc < 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); - return -1; - } - }} - - - else{ - for (i = 0; i != message_count; i++){ - rc = zmq_send( s, buf, message_size, 0); - if (rc < 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); - return -1; - } - }} + rc = zmq_msg_init (&msg); + if (rc != 0) { + printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); + return -1; } - float secs = tm_secs( &timer); - float total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0); + rc = zmq_recvmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); + return -1; + } + if (zmq_msg_size (&msg) != message_size) { + printf ("message of incorrect size received\n"); + return -1; + } - printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs); - printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs); + watch = zmq_stopwatch_start (); + for (i = 0; i != message_count - 1; i++) { + rc = zmq_recvmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno)); + return -1; + } + if (zmq_msg_size (&msg) != message_size) { + printf ("message of incorrect size received\n"); + return -1; + } + } + + elapsed = zmq_stopwatch_stop (watch); + if (elapsed == 0) + elapsed = 1; + + rc = zmq_msg_close (&msg); + if (rc != 0) { + printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); + return -1; + } + + throughput = (unsigned long) + ((double) message_count / (double) elapsed * 1000000); + megabits = (double) (throughput * message_size * 8) / 1000000; + + printf ("message size: %d [B]\n", (int) message_size); + printf ("message count: %d\n", (int) message_count); + printf ("mean throughput: %d [msg/s]\n", (int) throughput); + printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); rc = zmq_close (s); if (rc != 0) { @@ -316,7 +127,5 @@ int main (int argc, char *argv []) return -1; } - if( buf) free( buf); - return 0; } diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index 74e22e31..a4a7b6a3 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -1,7 +1,5 @@ /* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file This file is part of 0MQ. @@ -24,337 +22,72 @@ #include #include #include -#include -#include -#include "platform.hpp" - -#if defined ZMQ_HAVE_WINDOWS -#include -#include -#else -#include -#include -#include -#endif - -#define ZMSG 1 -#define DATA 0 - -const char *connect_to; -int message_count = 1000; -int message_size = 1024; -int threads = 1; -int workers = 1; -int sndbuflen = 128*256; -int rcvbuflen = 128*256; -int flow = ZMQ_PUSH; -int rec = DATA; - -typedef struct US_TIMER US_TIMER; - -struct US_TIMER{ - - struct timeval time_was; - struct timeval time_now; -}; -/* Records the current timer state -*/ -void tm_init( US_TIMER *t){ -#if defined ZMQ_HAVE_WINDOWS - - // Get the high resolution counter's accuracy. - LARGE_INTEGER ticksPerSecond; - QueryPerformanceFrequency (&ticksPerSecond); - - // What time is it? - LARGE_INTEGER tick; - if ( !QueryPerformanceCounter (&tick) ) { perror( "tm_init()" ); } - - // Seconds - t->time_now.tv_sec = (long)( tick.QuadPart / ticksPerSecond.QuadPart ); - // Microseconds - t->time_now.tv_usec = (long)( ( tick.QuadPart - t->time_now.tv_sec * ticksPerSecond.QuadPart ) * 1000000 / ticksPerSecond.QuadPart ); -#else - if( gettimeofday( &t->time_now, NULL) < 0){ perror( "tm_init()");} -#endif - - t->time_was = t->time_now; -} - -/* Returns the time passed in microsecond precision in seconds since last init - of timer. -*/ -float tm_secs( US_TIMER *t){ - - register float seconds; - -#if defined ZMQ_HAVE_WINDOWS - // Get the high resolution counter's accuracy. - LARGE_INTEGER ticksPerSecond; - QueryPerformanceFrequency (&ticksPerSecond); - - // What time is it? - LARGE_INTEGER tick; - if ( !QueryPerformanceCounter (&tick) ) { perror( "tm_secs()" ); } - - // Seconds - t->time_now.tv_sec = (long)( tick.QuadPart / ticksPerSecond.QuadPart ); - // Microseconds - t->time_now.tv_usec = (long)( ( tick.QuadPart - t->time_now.tv_sec * ticksPerSecond.QuadPart ) * 1000000 / ticksPerSecond.QuadPart ); -#else - if( gettimeofday( &t->time_now, NULL) < 0){ perror( "tm_secs()");} -#endif - seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) + - (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0)); - - t->time_was = t->time_now; - - return seconds; -} - -void my_free (void *data, void *hint) +int main (int argc, char *argv []) { - //free (data); -} + const char *connect_to; + int message_count; + int message_size; + void *ctx; + void *s; + int rc; + int i; + zmq_msg_t msg; -#if defined ZMQ_HAVE_WINDOWS -static unsigned int __stdcall worker_routine (void *ctx) -#else -static void *worker_routine (void *ctx) -#endif -{ - int rc,i; - void *buf = NULL; + if (argc != 4) { + printf ("usage: remote_thr " + "\n"); + return 1; + } + connect_to = argv [1]; + message_size = atoi (argv [2]); + message_count = atoi (argv [3]); - if( !(buf = malloc( message_size))){ perror("malloc"); return 0;} - - void *s = zmq_socket (ctx, flow); + ctx = zmq_init (1); + if (!ctx) { + printf ("error in zmq_init: %s\n", zmq_strerror (errno)); + return -1; + } + s = zmq_socket (ctx, ZMQ_PUSH); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); - return 0; + return -1; } // Add your socket options here. // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. - size_t rcvbuflenlen = (size_t)sizeof rcvbuflen; - size_t sndbuflenlen = (size_t)sizeof sndbuflen; - - rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - - sndbuflen = 1; - rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - - sndbuflen = 2; - rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - - sndbuflen = 2; - rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - - rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); - if (rc != 0) { - printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); - if (rc != 0) { - printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); - return 0; - } - - printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024); - - rc = zmq_connect (s, connect_to); if (rc != 0) { printf ("error in zmq_connect: %s\n", zmq_strerror (errno)); - return 0; + return -1; } - printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA"); - - if( flow == ZMQ_PUSH){ - - if( rec == ZMSG){ - - zmq_msg_t msg; - - for (i = 0; i != message_count; i++) { - - rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL); - if (rc != 0) { - printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno)); - return 0; - } - - rc = zmq_msg_send( &msg, s, 0); - if (rc < 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); - return 0; - } - - rc = zmq_msg_close (&msg); - if (rc != 0) { - printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); - exit (1); - } - }} - - else{ - - for (i = 0; i != message_count; i++) { - rc = zmq_send( s, buf, message_size, 0); - if (rc < 0) { - printf ("error in zmq_send: %s\n", zmq_strerror (errno)); - return 0; - } - }} - - } - else{ - - if( rec == ZMSG){ - - zmq_msg_t msg; - - rc = zmq_msg_init (&msg); + for (i = 0; i != message_count; i++) { + rc = zmq_msg_init_size (&msg, message_size); if (rc != 0) { - printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); - return 0; + printf ("error in zmq_msg_init_size: %s\n", zmq_strerror (errno)); + return -1; + } + rc = zmq_sendmsg (s, &msg, 0); + if (rc < 0) { + printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); + return -1; + } + rc = zmq_msg_close (&msg); + if (rc != 0) { + printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); + return -1; } - for (i = 0; i != message_count; i++) { - rc = zmq_msg_recv (&msg, s, 0); - if (rc < 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); - return 0; - } - }} - - else{ - - for (i = 0; i != message_count; i++) { - rc = zmq_recv( s, buf, message_size, 0); - if (rc < 0) { - printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); - return 0; - } - }} } rc = zmq_close (s); if (rc != 0) { printf ("error in zmq_close: %s\n", zmq_strerror (errno)); - return 0; - } - - free( buf); - - return 0; -} - -int main (int argc, char *argv []) -{ - void *ctx; - int rc; - int i; - - if (argc != 10) { - printf ("usage: remote_thr \n"); - return 1; - } - - connect_to = argv [1]; - message_size = atoi (argv [2]); - message_count = atoi (argv [3]); - sndbuflen = atoi (argv [4]); - rcvbuflen = atoi (argv [5]); - if( !strcmp( argv [6], "PUSH")){ - flow = ZMQ_PUSH; - } - if( !strcmp( argv [6], "PULL")){ - flow = ZMQ_PULL; - } - if( !strcmp( argv [7], "ZMSG")){ - rec = ZMSG; - } - if( !strcmp( argv [7], "DATA")){ - rec = DATA; - } - threads = atoi (argv [8]); - workers = atoi (argv [9]); - - - ctx = zmq_ctx_new (); - if (!ctx) { - printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno)); return -1; } - rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads); - if (rc) { - printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno)); - return -1; - } - - printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers); -#if defined ZMQ_HAVE_WINDOWS - HANDLE worker[128]; -#else - pthread_t worker[128]; -#endif - - US_TIMER timer; - - tm_init( &timer); - - for (i = 0; i < workers; i++) { -#if defined ZMQ_HAVE_WINDOWS - worker[i] = (HANDLE) _beginthreadex (NULL, 0, worker_routine, ctx, 0 , NULL); -#else - pthread_create (&worker[i], NULL, worker_routine, ctx); -#endif - printf("Worker %d spawned\n", i); - } - - for (i = 0; i < workers; i++) { -#if defined ZMQ_HAVE_WINDOWS - WaitForSingleObject (worker[i], INFINITE); - CloseHandle (worker[i]); -#else - pthread_join( worker[i], NULL); -#endif - printf("Worker %d joined\n", i); - } - - float secs = tm_secs( &timer); - float total = ( (float)workers)*(((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0); - - printf ("Message: size: %d KBytes, count: %d/workers(%d), time: %f secs\n", (int) message_size/1024, message_count, workers, secs); - printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs); - rc = zmq_term (ctx); if (rc != 0) { printf ("error in zmq_term: %s\n", zmq_strerror (errno)); @@ -363,4 +96,3 @@ int main (int argc, char *argv []) return 0; } -