added newly added socket options to all language bindings, P2P model changed to PUB/SUB for throughput tests

This commit is contained in:
malosek 2009-09-16 12:22:36 +02:00
parent 9c522dccaf
commit 7a5db6041f
16 changed files with 144 additions and 66 deletions

View File

@ -53,7 +53,7 @@ extern "C" {
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
#define ZMQ_MCAST_LOOP 10 // boolean
#define ZMQ_MCAST_LOOP 10 // int64_t
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.

View File

@ -98,6 +98,7 @@ JNIEXPORT void JNICALL Java_org_zmq_Socket_setsockopt__IJ (JNIEnv *env,
case ZMQ_AFFINITY:
case ZMQ_RATE:
case ZMQ_RECOVERY_IVL:
case ZMQ_MCAST_LOOP:
{
void *s = (void*) env->GetLongField (obj, socket_handle_fid);
assert (s);

View File

@ -44,6 +44,7 @@ public class Socket
public static final int UNSUBSCRIBE = 7;
public static final int RATE = 8;
public static final int RECOVERY_IVL = 9;
public static final int MCAST_LOOP = 10;
/**
* Class constructor.

View File

@ -48,9 +48,15 @@ int main (int argc, char *argv [])
ctx = zmq_init (1, 1);
assert (ctx);
s = zmq_socket (ctx, ZMQ_P2P);
s = zmq_socket (ctx, ZMQ_SUB);
assert (s);
rc = zmq_setsockopt (s, ZMQ_SUBSCRIBE , "*", 1);
assert (rc == 0);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
rc = zmq_bind (s, bind_to);
assert (rc == 0);

View File

@ -45,9 +45,12 @@ int main (int argc, char *argv [])
ctx = zmq_init (1, 1);
assert (ctx);
s = zmq_socket (ctx, ZMQ_P2P);
s = zmq_socket (ctx, ZMQ_PUB);
assert (s);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
rc = zmq_connect (s, connect_to);
assert (rc == 0);

View File

@ -36,7 +36,13 @@ int main (int argc, char *argv [])
zmq::context_t ctx (1, 1);
zmq::socket_t s (ctx, ZMQ_P2P);
zmq::socket_t s (ctx, ZMQ_SUB);
s.setsockopt (ZMQ_SUBSCRIBE , "*", 1);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s.bind (bind_to);
zmq::message_t msg;

View File

@ -36,7 +36,11 @@ int main (int argc, char *argv [])
zmq::context_t ctx (1, 1);
zmq::socket_t s (ctx, ZMQ_P2P);
zmq::socket_t s (ctx, ZMQ_PUB);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s.connect (connect_to);
for (int i = 0; i != message_count; i++) {

View File

@ -21,45 +21,51 @@ import org.zmq.*;
class local_thr
{
public static void main (String [] args)
{
if (args.length != 3) {
System.out.println ("usage: local_thr <bind-to> " +
"<message size> <message count>");
return;
}
public static void main (String [] args)
{
if (args.length != 3) {
System.out.println ("usage: local_thr <bind-to> " +
"<message size> <message count>");
return;
}
String bindTo = args [0];
long messageSize = Integer.parseInt (args [1]);
long messageCount = Integer.parseInt (args [2]);
String bindTo = args [0];
long messageSize = Integer.parseInt (args [1]);
long messageCount = Integer.parseInt (args [2]);
org.zmq.Context ctx = new org.zmq.Context (1, 1);
org.zmq.Context ctx = new org.zmq.Context (1, 1);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.P2P);
s.bind (bindTo);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.SUB);
byte [] data = s.recv (0);
assert (data.length == messageSize);
s.setsockopt (org.zmq.Socket.SUBSCRIBE , "*");
long start = System.currentTimeMillis ();
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
for (int i = 1; i != messageCount; i ++) {
data = s.recv (0);
assert (data.length == messageSize);
}
s.bind (bindTo);
long end = System.currentTimeMillis ();
byte [] data = s.recv (0);
assert (data.length == messageSize);
long elapsed = (end - start) * 1000;
if (elapsed == 0)
elapsed = 1;
long start = System.currentTimeMillis ();
long throughput = messageCount * 1000000 / elapsed;
double megabits = (double) (throughput * messageSize * 8) / 1000000;
for (int i = 1; i != messageCount; i ++) {
data = s.recv (0);
assert (data.length == messageSize);
}
System.out.println ("message size: " + messageSize + " [B]");
System.out.println ("message count: " + messageCount);
System.out.println ("mean throughput: " + throughput + "[msg/s]");
System.out.println ("mean throughput: " + megabits + "[Mb/s]");
}
long end = System.currentTimeMillis ();
long elapsed = (end - start) * 1000;
if (elapsed == 0)
elapsed = 1;
long throughput = messageCount * 1000000 / elapsed;
double megabits = (double) (throughput * messageSize * 8) / 1000000;
System.out.println ("message size: " + messageSize + " [B]");
System.out.println ("message count: " + messageCount);
System.out.println ("mean throughput: " + throughput + "[msg/s]");
System.out.println ("mean throughput: " + megabits + "[Mb/s]");
}
}

View File

@ -21,33 +21,37 @@ import org.zmq.*;
class remote_thr
{
public static void main (String [] args)
{
if (args.length != 3) {
System.out.println ("usage: remote_thr <connect-to> " +
"<message-size> <message-count>");
return;
}
public static void main (String [] args)
{
if (args.length != 3) {
System.out.println ("usage: remote_thr <connect-to> " +
"<message-size> <message-count>");
return;
}
// Parse the command line arguments.
String connectTo = args [0];
int messageSize = Integer.parseInt (args [1]);
int messageCount = Integer.parseInt (args [2]);
// Parse the command line arguments.
String connectTo = args [0];
int messageSize = Integer.parseInt (args [1]);
int messageCount = Integer.parseInt (args [2]);
org.zmq.Context ctx = new org.zmq.Context (1, 1);
org.zmq.Context ctx = new org.zmq.Context (1, 1);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.P2P);
s.connect (connectTo);
org.zmq.Socket s = new org.zmq.Socket (ctx, org.zmq.Socket.PUB);
byte msg [] = new byte [messageSize];
for (int i = 0; i != messageCount; i++)
s.send (msg, 0);
// Add your socket options here.
// For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
try {
Thread.sleep (10000);
}
catch (InterruptedException e) {
e.printStackTrace ();
}
s.connect (connectTo);
byte msg [] = new byte [messageSize];
for (int i = 0; i != messageCount; i++)
s.send (msg, 0);
try {
Thread.sleep (10000);
}
catch (InterruptedException e) {
e.printStackTrace ();
}
}
}

View File

@ -35,7 +35,13 @@ def main ():
sys.exit (1)
ctx = libpyzmq.Context (1, 1);
s = libpyzmq.Socket (ctx, libpyzmq.P2P)
s = libpyzmq.Socket (ctx, libpyzmq.SUB)
s.setsockopt (libpyzmq.SUBSCRIBE , "*");
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s.bind (bind_to)
msg = s.recv ()

View File

@ -35,7 +35,11 @@ def main ():
sys.exit (1)
ctx = libpyzmq.Context (1, 1);
s = libpyzmq.Socket (ctx, libpyzmq.P2P)
s = libpyzmq.Socket (ctx, libpyzmq.PUB)
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s.connect (connect_to)
msg = ''.join ([' ' for n in range (0, message_size)])

View File

@ -28,7 +28,12 @@ message_size = ARGV[1].to_i
message_count = ARGV[2].to_i
ctx = Context.new(1, 1)
s = Socket.new(ctx, P2P);
s = Socket.new(ctx, SUB);
s.setsockopt (SUBSCRIBE, "*");
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s.bind(bind_to);
msg = s.recv(0)

View File

@ -28,7 +28,11 @@ message_size = ARGV[1].to_i
message_count = ARGV[2].to_i
ctx = Context.new(1, 1)
s = Socket.new(ctx, P2P);
s = Socket.new(ctx, PUB);
# Add your socket options here.
# For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
s.connect(connect_to);
msg = "#{'0'*message_size}"

View File

@ -453,7 +453,7 @@ PyMODINIT_FUNC initlibpyzmq ()
PyObject *dict = PyModule_GetDict (module);
assert (dict);
PyObject *t;
PyObject *t;
t = PyInt_FromLong (ZMQ_NOBLOCK);
PyDict_SetItemString (dict, "NOBLOCK", t);
Py_DECREF (t);
@ -489,7 +489,23 @@ PyMODINIT_FUNC initlibpyzmq ()
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_IDENTITY);
PyDict_SetItemString (dict, "IDENTITY", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_SUBSCRIBE);
PyDict_SetItemString (dict, "SUBSCRIBE", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_UNSUBSCRIBE);
PyDict_SetItemString (dict, "UNSUBSCRIBE", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_RATE);
PyDict_SetItemString (dict, "RATE", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_RECOVERY_IVL);
PyDict_SetItemString (dict, "RECOVERY_IVL", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_MCAST_LOOP);
PyDict_SetItemString (dict, "MCAST_LOOP", t);
Py_DECREF (t);
}
#if defined _MSC_VER

View File

@ -282,6 +282,11 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("SWAP", INT2NUM (ZMQ_SWAP));
rb_define_global_const ("AFFINITY", INT2NUM (ZMQ_AFFINITY));
rb_define_global_const ("IDENTITY", INT2NUM (ZMQ_IDENTITY));
rb_define_global_const ("SUBSCRIBE", INT2NUM (ZMQ_SUBSCRIBE));
rb_define_global_const ("UNSUBSCRIBE", INT2NUM (ZMQ_UNSUBSCRIBE));
rb_define_global_const ("RATE", INT2NUM (ZMQ_RATE));
rb_define_global_const ("RECOVERY_IVL", INT2NUM (ZMQ_RECOVERY_IVL));
rb_define_global_const ("MCAST_LOOP", INT2NUM (ZMQ_MCAST_LOOP));
rb_define_global_const ("NOBLOCK", INT2NUM (ZMQ_NOBLOCK));
rb_define_global_const ("NOFLUSH", INT2NUM (ZMQ_NOFLUSH));

View File

@ -158,11 +158,18 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
return 0;
case ZMQ_MCAST_LOOP:
if (optvallen_ != sizeof (bool)) {
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
if ((int64_t) *((int64_t*) optval_) == 0 ||
(int64_t) *((int64_t*) optval_) == 1) {
options.use_multicast_loop = (bool) *((int64_t*) optval_);
} else {
errno = EINVAL;
return -1;
}
options.use_multicast_loop = optval_;
return 0;
default: