diff --git a/perf/remote_thr.cpp b/perf/remote_thr.cpp index 2f98bd4e..c2ce375e 100644 --- a/perf/remote_thr.cpp +++ b/perf/remote_thr.cpp @@ -28,7 +28,15 @@ #include #include #include + +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include +#include +#else #include +#endif #define ZMSG 1 #define DATA 0 @@ -82,18 +90,22 @@ void my_free (void *data, void *hint) //free (data); } -static void *worker_routine (void *ctx) { - +#if defined ZMQ_HAVE_WINDOWS +static unsigned int __stdcall worker_routine (void *ctx) +#else +static void *worker_routine (void *ctx) +#endif +{ int rc,i; void *buf = NULL; - if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;} + if( !(buf = malloc( message_size))){ perror("malloc"); return 0;} void *s = zmq_socket (ctx, flow); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } // Add your socket options here. @@ -105,44 +117,44 @@ static void *worker_routine (void *ctx) { rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } sndbuflen = 1; rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } sndbuflen = 2; rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } sndbuflen = 2; rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); if (rc != 0) { printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); if (rc != 0) { printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024); @@ -151,7 +163,7 @@ static void *worker_routine (void *ctx) { rc = zmq_connect (s, connect_to); if (rc != 0) { printf ("error in zmq_connect: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } printf("%sING %s...\n", flow == ZMQ_PUSH ? "PUSH":"PULL", rec ? "ZMQ_MSG":"DATA"); @@ -167,13 +179,13 @@ static void *worker_routine (void *ctx) { rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL); if (rc != 0) { printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } rc = zmq_msg_send( &msg, s, 0); if (rc < 0) { printf ("error in zmq_send: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } rc = zmq_msg_close (&msg); @@ -189,7 +201,7 @@ static void *worker_routine (void *ctx) { rc = zmq_send( s, buf, message_size, 0); if (rc < 0) { printf ("error in zmq_send: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } }} @@ -203,13 +215,13 @@ static void *worker_routine (void *ctx) { rc = zmq_msg_init (&msg); if (rc != 0) { printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } for (i = 0; i != message_count; i++) { rc = zmq_msg_recv (&msg, s, 0); if (rc < 0) { printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } }} @@ -219,7 +231,7 @@ static void *worker_routine (void *ctx) { rc = zmq_recv( s, buf, message_size, 0); if (rc < 0) { printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } }} } @@ -227,12 +239,12 @@ static void *worker_routine (void *ctx) { rc = zmq_close (s); if (rc != 0) { printf ("error in zmq_close: %s\n", zmq_strerror (errno)); - return NULL; + return 0; } free( buf); - return NULL; + return 0; } int main (int argc, char *argv []) @@ -240,7 +252,6 @@ int main (int argc, char *argv []) void *ctx; int rc; int i; - void *p; if (argc != 10) { printf ("usage: remote_thr \n"); @@ -280,21 +291,34 @@ int main (int argc, char *argv []) return -1; } - printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers); - pthread_t worker[128]; + printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers); +#if defined ZMQ_HAVE_WINDOWS + HANDLE worker[128]; +#else + pthread_t worker[128]; +#endif US_TIMER timer; tm_init( &timer); for (i = 0; i < workers; i++) { +#if defined ZMQ_HAVE_WINDOWS + worker[i] = (HANDLE) _beginthreadex (NULL, 0, worker_routine, ctx, 0 , NULL); +#else pthread_create (&worker[i], NULL, worker_routine, ctx); - printf("Worker %d spawned\n", i); +#endif + printf("Worker %d spawned\n", i); } for (i = 0; i < workers; i++) { +#if defined ZMQ_HAVE_WINDOWS + WaitForSingleObject (worker[i], INFINITE); + CloseHandle (worker[i]); +#else pthread_join( worker[i], &p); - printf("Worker %d joined\n", i); +#endif + printf("Worker %d joined\n", i); } float secs = tm_secs( &timer);