mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-26 02:18:06 +01:00
Merge pull request #452 from hshardeesi/master
New socket option (ZMQ_ROUTER_RAW_SOCK) for ZMQ_ROUTER sockets
This commit is contained in:
@@ -42,6 +42,8 @@
|
||||
#include "decoder.hpp"
|
||||
#include "v1_encoder.hpp"
|
||||
#include "v1_decoder.hpp"
|
||||
#include "raw_decoder.hpp"
|
||||
#include "raw_encoder.hpp"
|
||||
#include "config.hpp"
|
||||
#include "err.hpp"
|
||||
#include "ip.hpp"
|
||||
@@ -133,13 +135,26 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
||||
io_object_t::plug (io_thread_);
|
||||
handle = add_fd (s);
|
||||
|
||||
// Send the 'length' and 'flags' fields of the identity message.
|
||||
// The 'length' field is encoded in the long format.
|
||||
outpos = greeting_output_buffer;
|
||||
outpos [outsize++] = 0xff;
|
||||
put_uint64 (&outpos [outsize], options.identity_size + 1);
|
||||
outsize += 8;
|
||||
outpos [outsize++] = 0x7f;
|
||||
if(options.raw_sock){
|
||||
// no handshaking for raw sock, instantiate raw encoder and decoders
|
||||
encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session);
|
||||
alloc_assert (encoder);
|
||||
|
||||
decoder = new (std::nothrow)
|
||||
raw_decoder_t (in_batch_size, options.maxmsgsize, session);
|
||||
alloc_assert (decoder);
|
||||
|
||||
// disable handshaking for raw socket
|
||||
handshaking = false;
|
||||
}else{
|
||||
// Send the 'length' and 'flags' fields of the identity message.
|
||||
// The 'length' field is encoded in the long format.
|
||||
outpos = greeting_output_buffer;
|
||||
outpos [outsize++] = 0xff;
|
||||
put_uint64 (&outpos [outsize], options.identity_size + 1);
|
||||
outsize += 8;
|
||||
outpos [outsize++] = 0x7f;
|
||||
}
|
||||
|
||||
set_pollin (handle);
|
||||
set_pollout (handle);
|
||||
@@ -181,6 +196,7 @@ void zmq::stream_engine_t::in_event ()
|
||||
|
||||
zmq_assert (decoder);
|
||||
bool disconnection = false;
|
||||
size_t processed;
|
||||
|
||||
// If there's no data to process in the buffer...
|
||||
if (!insize) {
|
||||
@@ -199,8 +215,16 @@ void zmq::stream_engine_t::in_event ()
|
||||
}
|
||||
}
|
||||
|
||||
// Push the data to the decoder.
|
||||
size_t processed = decoder->process_buffer (inpos, insize);
|
||||
if(options.raw_sock){
|
||||
if(insize == 0 || !decoder->message_ready_size(insize)){
|
||||
processed = 0;
|
||||
}else{
|
||||
processed = decoder->process_buffer (inpos, insize);
|
||||
}
|
||||
}else{
|
||||
// Push the data to the decoder.
|
||||
processed = decoder->process_buffer (inpos, insize);
|
||||
}
|
||||
|
||||
if (unlikely (processed == (size_t) -1)) {
|
||||
disconnection = true;
|
||||
|
||||
Reference in New Issue
Block a user