mirror of
https://github.com/zeromq/libzmq.git
synced 2025-05-28 15:14:12 +02:00
Add STREAM connect notification.
Adjust test cases to connection notification. Increase error checking in test cases.
This commit is contained in:
parent
53d0199e50
commit
afb24b53e6
@ -177,6 +177,14 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
||||
|
||||
read_msg = &stream_engine_t::pull_msg_from_session;
|
||||
write_msg = &stream_engine_t::push_msg_to_session;
|
||||
|
||||
// For raw sockets, send an initial 0-length message to the
|
||||
// application so that it knows a peer has connected.
|
||||
msg_t connector;
|
||||
connector.init();
|
||||
int rc = (this->*write_msg) (&connector);
|
||||
connector.close();
|
||||
session->flush ();
|
||||
}
|
||||
else {
|
||||
// Send the 'length' and 'flags' fields of the identity message.
|
||||
|
@ -68,6 +68,7 @@ test_stream_to_dealer (void)
|
||||
rc = zmq_send (dealer, "Hello", 5, 0);
|
||||
assert (rc == 5);
|
||||
|
||||
// Connecting sends a zero message
|
||||
// First frame is identity
|
||||
zmq_msg_t identity;
|
||||
rc = zmq_msg_init (&identity);
|
||||
@ -76,9 +77,19 @@ test_stream_to_dealer (void)
|
||||
assert (rc > 0);
|
||||
assert (zmq_msg_more (&identity));
|
||||
|
||||
// Second frame is greeting signature
|
||||
// Second frame is zero
|
||||
byte buffer [255];
|
||||
rc = zmq_recv (stream, buffer, 255, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
// Real data follows
|
||||
// First frame is identity
|
||||
rc = zmq_msg_recv (&identity, stream, 0);
|
||||
assert (rc > 0);
|
||||
assert (zmq_msg_more (&identity));
|
||||
|
||||
// Second frame is greeting signature
|
||||
rc = zmq_recv (stream, buffer, 255, 0);
|
||||
assert (rc == 10);
|
||||
assert (memcmp (buffer, greeting.signature, 10) == 0);
|
||||
|
||||
@ -178,14 +189,26 @@ test_stream_to_stream (void)
|
||||
assert (client);
|
||||
rc = zmq_connect (client, "tcp://localhost:9070");
|
||||
assert (rc == 0);
|
||||
// It would be less surprising to get an empty message instead
|
||||
// of having to fetch the identity like this [PH 2013/06/27]
|
||||
uint8_t id [256];
|
||||
size_t id_size = 256;
|
||||
uint8_t buffer [256];
|
||||
|
||||
// Connecting sends a zero message
|
||||
// Server: First frame is identity, second frame is zero
|
||||
id_size = zmq_recv (server, id, 256, 0);
|
||||
assert (id_size > 0);
|
||||
rc = zmq_recv (server, buffer, 256, 0);
|
||||
assert (rc == 0);
|
||||
// Client: First frame is identity, second frame is zero
|
||||
id_size = zmq_recv (client, id, 256, 0);
|
||||
assert (id_size > 0);
|
||||
rc = zmq_recv (client, buffer, 256, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
// Sent HTTP request on client socket
|
||||
// Get server identity
|
||||
rc = zmq_getsockopt (client, ZMQ_IDENTITY, id, &id_size);
|
||||
assert (rc == 0);
|
||||
|
||||
// Sent HTTP request on client socket
|
||||
// First frame is server identity
|
||||
rc = zmq_send (client, id, id_size, ZMQ_SNDMORE);
|
||||
assert (rc == (int) id_size);
|
||||
@ -196,9 +219,8 @@ test_stream_to_stream (void)
|
||||
// Get HTTP request; ID frame and then request
|
||||
id_size = zmq_recv (server, id, 256, 0);
|
||||
assert (id_size > 0);
|
||||
uint8_t buffer [256];
|
||||
rc = zmq_recv (server, buffer, 256, 0);
|
||||
assert (rc > 0);
|
||||
assert (rc != -1);
|
||||
assert (memcmp (buffer, "GET /\n\n", 7) == 0);
|
||||
|
||||
// Send reply back to client
|
||||
@ -207,8 +229,16 @@ test_stream_to_stream (void)
|
||||
"Content-Type: text/plain\r\n"
|
||||
"\r\n"
|
||||
"Hello, World!";
|
||||
zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
||||
zmq_send (server, http_response, sizeof (http_response), 0);
|
||||
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
rc = zmq_send (server, http_response, sizeof (http_response), ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
|
||||
// Send zero to close connection to client
|
||||
rc = zmq_send (server, id, id_size, ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
rc = zmq_send (server, NULL, 0, ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
|
||||
// Get reply at client and check that it's complete
|
||||
id_size = zmq_recv (client, id, 256, 0);
|
||||
@ -216,6 +246,22 @@ test_stream_to_stream (void)
|
||||
rc = zmq_recv (client, buffer, 256, 0);
|
||||
assert (rc == sizeof (http_response));
|
||||
assert (memcmp (buffer, http_response, sizeof (http_response)) == 0);
|
||||
|
||||
// // Get disconnection notification
|
||||
// FIXME: why does this block? Bug in STREAM disconnect notification?
|
||||
// id_size = zmq_recv (client, id, 256, 0);
|
||||
// assert (id_size > 0);
|
||||
// rc = zmq_recv (client, buffer, 256, 0);
|
||||
// assert (rc == 0);
|
||||
|
||||
rc = zmq_close (server);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (client);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
|
||||
|
@ -68,22 +68,60 @@ int main(int argc, char** argv)
|
||||
rc = zmq_connect (sockets [CLIENT], "tcp://localhost:6666");
|
||||
assert (rc == 0);
|
||||
|
||||
// TODO: wait for client to become ready.
|
||||
// wait for connect notification
|
||||
// Server: Grab the 1st frame (peer identity).
|
||||
zmq_msg_t peer_frame;
|
||||
rc = zmq_msg_init (&peer_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&peer_frame) > 0);
|
||||
assert (has_more (sockets [SERVER]));
|
||||
|
||||
// Server: Grab the 2nd frame (actual payload).
|
||||
zmq_msg_t data_frame;
|
||||
rc = zmq_msg_init (&data_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&data_frame) == 0);
|
||||
|
||||
// Client: Grab the 1st frame (peer identity).
|
||||
rc = zmq_msg_init (&peer_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&peer_frame) > 0);
|
||||
assert (has_more (sockets [CLIENT]));
|
||||
|
||||
// Client: Grab the 2nd frame (actual payload).
|
||||
rc = zmq_msg_init (&data_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&data_frame) == 0);
|
||||
|
||||
// Send initial message.
|
||||
char blob_data [256];
|
||||
size_t blob_size = sizeof(blob_data);
|
||||
rc = zmq_getsockopt (sockets [CLIENT], ZMQ_IDENTITY, blob_data, &blob_size);
|
||||
assert (rc == 0);
|
||||
assert (rc != -1);
|
||||
assert(blob_size > 0);
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init_size (&msg, blob_size);
|
||||
rc = zmq_msg_init_size (&msg, blob_size);
|
||||
assert (rc == 0);
|
||||
memcpy (zmq_msg_data (&msg), blob_data, blob_size);
|
||||
zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
|
||||
zmq_msg_close (&msg);
|
||||
zmq_msg_init_size (&msg, strlen(dialog [0].text)+1);
|
||||
memcpy (zmq_msg_data (&msg), dialog [0].text, strlen(dialog [0].text)+1);
|
||||
zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
|
||||
zmq_msg_close (&msg);
|
||||
rc = zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_init_size (&msg, strlen(dialog [0].text));
|
||||
assert (rc == 0);
|
||||
memcpy (zmq_msg_data (&msg), dialog [0].text, strlen(dialog [0].text));
|
||||
rc = zmq_msg_send (&msg, sockets [dialog [0].turn], ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
|
||||
// TODO: make sure this loop doesn't loop forever if something is wrong
|
||||
// with the test (or the implementation).
|
||||
@ -106,47 +144,64 @@ int main(int argc, char** argv)
|
||||
|
||||
// Grab the 1st frame (peer identity).
|
||||
zmq_msg_t peer_frame;
|
||||
zmq_msg_init (&peer_frame);
|
||||
zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
|
||||
rc = zmq_msg_init (&peer_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&peer_frame, sockets [SERVER], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&peer_frame) > 0);
|
||||
assert (has_more (sockets [SERVER]));
|
||||
|
||||
// Grab the 2nd frame (actual payload).
|
||||
zmq_msg_t data_frame;
|
||||
zmq_msg_init (&data_frame);
|
||||
zmq_msg_recv (&data_frame, sockets [SERVER], 0);
|
||||
rc = zmq_msg_init (&data_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&data_frame, sockets [SERVER], 0);
|
||||
assert (rc != -1);
|
||||
|
||||
// Make sure payload matches what we expect.
|
||||
const char * const data = (const char*)zmq_msg_data (&data_frame);
|
||||
const int size = zmq_msg_size (&data_frame);
|
||||
int cmp = memcmp(dialog [step].text, data, size);
|
||||
assert (cmp == 0);
|
||||
|
||||
++step;
|
||||
|
||||
// 0-length frame is a disconnection notification. The server
|
||||
// should receive it as the last step in the dialogue.
|
||||
if (size == 0) {
|
||||
printf ("server received disconnection notification!\n");
|
||||
++step;
|
||||
assert (step == steps);
|
||||
}
|
||||
else {
|
||||
printf ("server received %d bytes.\n", size);
|
||||
fprintf(stderr, "size = %d, len = %ld\n", size, strlen(dialog [step].text));
|
||||
assert((size_t)size == strlen(dialog [step].text));
|
||||
int cmp = memcmp(dialog [step].text, data, size);
|
||||
assert (cmp == 0);
|
||||
|
||||
++step;
|
||||
|
||||
assert (step < steps);
|
||||
|
||||
// Prepare the response.
|
||||
zmq_msg_close (&data_frame);
|
||||
zmq_msg_init_size (&data_frame, strlen (dialog [step].text));
|
||||
memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame));
|
||||
rc = zmq_msg_close (&data_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_init_size (&data_frame,
|
||||
strlen (dialog [step].text));
|
||||
assert (rc == 0);
|
||||
memcpy (zmq_msg_data (&data_frame), dialog [step].text,
|
||||
zmq_msg_size (&data_frame));
|
||||
|
||||
// Send the response.
|
||||
printf ("server sending %d bytes.\n", (int)zmq_msg_size (&data_frame));
|
||||
zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE);
|
||||
zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE);
|
||||
printf ("server sending %d bytes.\n",
|
||||
(int)zmq_msg_size (&data_frame));
|
||||
rc = zmq_msg_send (&peer_frame, sockets [SERVER], ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
rc = zmq_msg_send (&data_frame, sockets [SERVER], ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
}
|
||||
|
||||
// Release resources.
|
||||
zmq_msg_close (&peer_frame);
|
||||
zmq_msg_close (&data_frame);
|
||||
rc = zmq_msg_close (&peer_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_close (&data_frame);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
// Check for data received by the client.
|
||||
@ -155,18 +210,26 @@ int main(int argc, char** argv)
|
||||
|
||||
// Grab the 1st frame (peer identity).
|
||||
zmq_msg_t peer_frame;
|
||||
zmq_msg_init (&peer_frame);
|
||||
zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
|
||||
rc = zmq_msg_init (&peer_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&peer_frame, sockets [CLIENT], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&peer_frame) > 0);
|
||||
assert (has_more (sockets [CLIENT]));
|
||||
|
||||
// Grab the 2nd frame (actual payload).
|
||||
zmq_msg_t data_frame;
|
||||
zmq_msg_init (&data_frame);
|
||||
zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
|
||||
rc = zmq_msg_init (&data_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_recv (&data_frame, sockets [CLIENT], 0);
|
||||
assert (rc != -1);
|
||||
assert(zmq_msg_size (&data_frame) > 0);
|
||||
|
||||
// Make sure payload matches what we expect.
|
||||
const char * const data = (const char*)zmq_msg_data (&data_frame);
|
||||
const int size = zmq_msg_size (&data_frame);
|
||||
fprintf(stderr, "size = %d, len = %ld\n", size, strlen(dialog [step].text));
|
||||
assert((size_t)size == strlen(dialog [step].text));
|
||||
int cmp = memcmp(dialog [step].text, data, size);
|
||||
assert (cmp == 0);
|
||||
|
||||
@ -176,25 +239,34 @@ int main(int argc, char** argv)
|
||||
|
||||
// Prepare the response (next line in the dialog).
|
||||
assert (step < steps);
|
||||
zmq_msg_close (&data_frame);
|
||||
zmq_msg_init_size (&data_frame, strlen (dialog [step].text));
|
||||
rc = zmq_msg_close (&data_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_init_size (&data_frame, strlen (dialog [step].text));
|
||||
assert (rc == 0);
|
||||
memcpy (zmq_msg_data (&data_frame), dialog [step].text, zmq_msg_size (&data_frame));
|
||||
|
||||
// Send the response.
|
||||
printf ("client sending %d bytes.\n", (int)zmq_msg_size (&data_frame));
|
||||
zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE);
|
||||
zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE);
|
||||
rc = zmq_msg_send (&peer_frame, sockets [CLIENT], ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
rc = zmq_msg_send (&data_frame, sockets [CLIENT], ZMQ_SNDMORE);
|
||||
assert (rc != -1);
|
||||
|
||||
// Release resources.
|
||||
zmq_msg_close (&peer_frame);
|
||||
zmq_msg_close (&data_frame);
|
||||
rc = zmq_msg_close (&peer_frame);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_close (&data_frame);
|
||||
assert (rc == 0);
|
||||
}
|
||||
}
|
||||
assert (step == steps);
|
||||
|
||||
printf ("Done, exiting now.\n");
|
||||
zmq_close (sockets [CLIENT]);
|
||||
zmq_close (sockets [SERVER]);
|
||||
zmq_ctx_term (context);
|
||||
rc = zmq_close (sockets [CLIENT]);
|
||||
assert (rc == 0);
|
||||
rc = zmq_close (sockets [SERVER]);
|
||||
assert (rc == 0);
|
||||
rc = zmq_ctx_term (context);
|
||||
assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user