mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-20 22:31:34 +01:00
Style fixes
This commit is contained in:
parent
abbe34cdc2
commit
9d8eb1f9b9
@ -143,8 +143,9 @@ namespace zmq
|
||||
}
|
||||
}
|
||||
|
||||
inline bool message_ready_size (size_t msg_sz){
|
||||
zmq_assert(false);
|
||||
inline bool message_ready_size (size_t msg_sz)
|
||||
{
|
||||
zmq_assert (false);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ namespace zmq
|
||||
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0;
|
||||
|
||||
virtual bool stalled () const = 0;
|
||||
|
||||
|
||||
virtual bool message_ready_size (size_t msg_sz) = 0;
|
||||
};
|
||||
|
||||
|
@ -114,7 +114,7 @@ namespace zmq
|
||||
|
||||
// If true, the identity message is forwarded to the socket.
|
||||
bool recv_identity;
|
||||
|
||||
|
||||
// if true, router socket accepts non-zmq tcp connections
|
||||
bool raw_sock;
|
||||
|
||||
|
@ -93,7 +93,7 @@ bool zmq::raw_decoder_t::raw_message_ready ()
|
||||
// raw_message_ready should never get called in state machine w/o
|
||||
// message_ready_size from stream_engine.
|
||||
next_step (in_progress.data (), 1,
|
||||
&raw_decoder_t::raw_message_ready);
|
||||
&raw_decoder_t::raw_message_ready);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ namespace zmq
|
||||
{
|
||||
public:
|
||||
|
||||
raw_decoder_t (size_t bufsize_,
|
||||
raw_decoder_t (size_t bufsize_,
|
||||
int64_t maxmsgsize_, i_msg_sink *msg_sink_);
|
||||
virtual ~raw_decoder_t ();
|
||||
|
||||
|
@ -58,7 +58,6 @@ bool zmq::raw_encoder_t::raw_message_size_ready ()
|
||||
|
||||
bool zmq::raw_encoder_t::raw_message_ready ()
|
||||
{
|
||||
|
||||
// Destroy content of the old message.
|
||||
int rc = in_progress.close ();
|
||||
errno_assert (rc == 0);
|
||||
|
@ -40,7 +40,6 @@
|
||||
namespace zmq
|
||||
{
|
||||
|
||||
|
||||
// Encoder for 0MQ framing protocol. Converts messages into data batches.
|
||||
|
||||
class raw_encoder_t : public encoder_base_t <raw_encoder_t>
|
||||
|
@ -78,8 +78,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||
int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
||||
size_t optvallen_)
|
||||
{
|
||||
if (option_ != ZMQ_ROUTER_MANDATORY &&
|
||||
option_ != ZMQ_ROUTER_RAW_SOCK) {
|
||||
if (option_ != ZMQ_ROUTER_MANDATORY
|
||||
&& option_ != ZMQ_ROUTER_RAW_SOCK) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
@ -87,16 +87,15 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_,
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if(option_ == ZMQ_ROUTER_RAW_SOCK){
|
||||
raw_sock = *static_cast <const int*> (optval_);
|
||||
if(raw_sock){
|
||||
if (option_ == ZMQ_ROUTER_RAW_SOCK) {
|
||||
raw_sock = *static_cast <const int*> (optval_);
|
||||
if (raw_sock) {
|
||||
options.recv_identity = false;
|
||||
options.raw_sock = true;
|
||||
}
|
||||
|
||||
}else{
|
||||
mandatory = *static_cast <const int*> (optval_);
|
||||
}
|
||||
else
|
||||
mandatory = *static_cast <const int*> (optval_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -170,8 +169,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
|
||||
it->second.active = false;
|
||||
current_out = NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
}
|
||||
else
|
||||
if (mandatory) {
|
||||
more_out = false;
|
||||
errno = EHOSTUNREACH;
|
||||
@ -186,9 +185,9 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ignore the MORE flag for raw-sock or assert?
|
||||
if(options.raw_sock)
|
||||
msg_->reset_flags(msg_t::more);
|
||||
// Ignore the MORE flag for raw-sock or assert?
|
||||
if (options.raw_sock)
|
||||
msg_->reset_flags (msg_t::more);
|
||||
|
||||
// Check whether this is the last part of the message.
|
||||
more_out = msg_->flags () & msg_t::more ? true : false;
|
||||
@ -199,13 +198,13 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_)
|
||||
// Close the remote connection if user has asked to do so
|
||||
// by sending zero length message.
|
||||
// Pending messages in the pipe will be dropped (on receiving term- ack)
|
||||
if (raw_sock && msg_->size() == 0){
|
||||
current_out->terminate(false);
|
||||
if (raw_sock && msg_->size() == 0) {
|
||||
current_out->terminate (false);
|
||||
int rc = msg_->close ();
|
||||
errno_assert (rc == 0);
|
||||
current_out = NULL;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
bool ok = current_out->write (msg_);
|
||||
if (unlikely (!ok))
|
||||
@ -349,12 +348,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_)
|
||||
blob_t identity;
|
||||
bool ok;
|
||||
|
||||
if(options.raw_sock){ // always assign identity for raw-socket
|
||||
if (options.raw_sock) { // Always assign identity for raw-socket
|
||||
unsigned char buf [5];
|
||||
buf [0] = 0;
|
||||
put_uint32 (buf + 1, next_peer_id++);
|
||||
identity = blob_t (buf, sizeof buf);
|
||||
}else{
|
||||
}
|
||||
else {
|
||||
msg.init ();
|
||||
ok = pipe_->read (&msg);
|
||||
if (!ok)
|
||||
|
@ -120,10 +120,10 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
||||
identity_received (false),
|
||||
addr (addr_)
|
||||
{
|
||||
// identities are not exchanged for raw sockets
|
||||
if(options.raw_sock){
|
||||
identity_sent = (true);
|
||||
identity_received = (true);
|
||||
// Identities are not exchanged for raw sockets
|
||||
if (options.raw_sock) {
|
||||
identity_sent = true;
|
||||
identity_received = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -250,12 +250,12 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||
// Remove the pipe from the detached pipes set
|
||||
terminating_pipes.erase (pipe_);
|
||||
|
||||
if (!is_terminating() && options.raw_sock){
|
||||
if(engine){
|
||||
if (!is_terminating () && options.raw_sock) {
|
||||
if (engine) {
|
||||
engine->terminate ();
|
||||
engine = NULL;
|
||||
}
|
||||
terminate();
|
||||
terminate ();
|
||||
}
|
||||
|
||||
|
||||
|
@ -135,7 +135,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
||||
io_object_t::plug (io_thread_);
|
||||
handle = add_fd (s);
|
||||
|
||||
if(options.raw_sock){
|
||||
if (options.raw_sock) {
|
||||
// no handshaking for raw sock, instantiate raw encoder and decoders
|
||||
encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session);
|
||||
alloc_assert (encoder);
|
||||
@ -146,7 +146,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
|
||||
|
||||
// disable handshaking for raw socket
|
||||
handshaking = false;
|
||||
}else{
|
||||
}
|
||||
else {
|
||||
// Send the 'length' and 'flags' fields of the identity message.
|
||||
// The 'length' field is encoded in the long format.
|
||||
outpos = greeting_output_buffer;
|
||||
@ -215,13 +216,13 @@ void zmq::stream_engine_t::in_event ()
|
||||
}
|
||||
}
|
||||
|
||||
if(options.raw_sock){
|
||||
if(insize == 0 || !decoder->message_ready_size(insize)){
|
||||
processed = 0;
|
||||
}else{
|
||||
if (options.raw_sock) {
|
||||
if (insize == 0 || !decoder->message_ready_size (insize))
|
||||
processed = 0;
|
||||
else
|
||||
processed = decoder->process_buffer (inpos, insize);
|
||||
}
|
||||
}else{
|
||||
}
|
||||
else {
|
||||
// Push the data to the decoder.
|
||||
processed = decoder->process_buffer (inpos, insize);
|
||||
}
|
||||
|
@ -31,78 +31,72 @@
|
||||
#include <assert.h>
|
||||
#include <fcntl.h>
|
||||
#include <zmq.h>
|
||||
#include<unistd.h>
|
||||
#include <unistd.h>
|
||||
|
||||
//ToDo: Windows?
|
||||
const char *test_str = "TEST-STRING";
|
||||
|
||||
|
||||
int tcp_client(){
|
||||
|
||||
int sockfd, portno;
|
||||
int tcp_client ()
|
||||
{
|
||||
struct sockaddr_in serv_addr;
|
||||
struct hostent *server;
|
||||
|
||||
portno = 5555;
|
||||
const int portno = 5555;
|
||||
|
||||
sockfd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
assert(sockfd >=0 );
|
||||
server = gethostbyname("localhost");
|
||||
assert(server);
|
||||
int sockfd = socket (AF_INET, SOCK_STREAM, 0);
|
||||
assert (sockfd >= 0);
|
||||
server = gethostbyname ("localhost");
|
||||
assert (server);
|
||||
|
||||
bzero((char *) &serv_addr, sizeof(serv_addr));
|
||||
bzero (&serv_addr, sizeof serv_addr);
|
||||
serv_addr.sin_family = AF_INET;
|
||||
bcopy((char *)server->h_addr,
|
||||
(char *)&serv_addr.sin_addr.s_addr,
|
||||
server->h_length);
|
||||
serv_addr.sin_port = htons(portno);
|
||||
bcopy (server->h_addr, &serv_addr.sin_addr.s_addr, server->h_length);
|
||||
serv_addr.sin_port = htons (portno);
|
||||
|
||||
if (connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0)
|
||||
assert(0);
|
||||
int rc = connect (sockfd, (struct sockaddr *) &serv_addr, sizeof serv_addr);
|
||||
assert (rc == 0);
|
||||
int nodelay = 1;
|
||||
int rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
|
||||
sizeof (int));
|
||||
assert(rc == 0);
|
||||
|
||||
rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
|
||||
sizeof nodelay);
|
||||
assert (rc == 0);
|
||||
|
||||
return sockfd;
|
||||
}
|
||||
|
||||
void tcp_client_write(int sockfd, const void *buf, int buf_len){
|
||||
assert(buf);
|
||||
int n = write(sockfd, buf, buf_len);
|
||||
assert(n >= 0);
|
||||
void tcp_client_write (int sockfd, const void *buf, int buf_len)
|
||||
{
|
||||
assert (buf);
|
||||
int n = write (sockfd, buf, buf_len);
|
||||
assert (n >= 0);
|
||||
}
|
||||
|
||||
void tcp_client_read(int sockfd){
|
||||
void tcp_client_read (int sockfd)
|
||||
{
|
||||
struct timeval tm;
|
||||
tm.tv_sec = 1;
|
||||
tm.tv_usec = 0;
|
||||
fd_set r;
|
||||
|
||||
int sr;
|
||||
char buffer[16];
|
||||
char buffer [16];
|
||||
|
||||
FD_ZERO(&r);
|
||||
FD_SET(sockfd, &r);
|
||||
FD_ZERO (&r);
|
||||
FD_SET (sockfd, &r);
|
||||
|
||||
if ((sr = select(sockfd + 1, &r, NULL, NULL, &tm)) <= 0)
|
||||
{
|
||||
assert(0);
|
||||
}
|
||||
int sr = select (sockfd + 1, &r, NULL, NULL, &tm);
|
||||
assert (sr > 0);
|
||||
|
||||
int n = read(sockfd, buffer, 16);
|
||||
assert(n>0);
|
||||
assert(memcmp(buffer, test_str, strlen(test_str)) == 0);
|
||||
int n = read (sockfd, buffer, 16);
|
||||
assert (n > 0);
|
||||
assert (memcmp (buffer, test_str, strlen (test_str)) == 0);
|
||||
}
|
||||
|
||||
|
||||
void tcp_client_close(int sockfd){
|
||||
close(sockfd);
|
||||
void tcp_client_close (int sockfd)
|
||||
{
|
||||
close (sockfd);
|
||||
}
|
||||
|
||||
|
||||
int main(){
|
||||
int main ()
|
||||
{
|
||||
fprintf (stderr, "test_raw_sock running...\n");
|
||||
|
||||
zmq_msg_t message;
|
||||
@ -112,56 +106,49 @@ int main(){
|
||||
void *ctx = zmq_init (1);
|
||||
assert (ctx);
|
||||
|
||||
int raw_sock = 1, rc = 0;
|
||||
void *sb = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert (sb);
|
||||
rc = zmq_setsockopt( sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof(int));
|
||||
assert(rc == 0);
|
||||
|
||||
int raw_sock = 1;
|
||||
int rc = zmq_setsockopt (sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof raw_sock);
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (sb, "tcp://127.0.0.1:5555");
|
||||
assert (rc == 0);
|
||||
|
||||
int sock_fd = tcp_client();
|
||||
assert(sock_fd >= 0);
|
||||
int sock_fd = tcp_client ();
|
||||
assert (sock_fd >= 0);
|
||||
// ===================
|
||||
|
||||
zmq_msg_init(&message);
|
||||
zmq_msg_init(&id);
|
||||
zmq_msg_init (&message);
|
||||
zmq_msg_init (&id);
|
||||
assert (rc == 0);
|
||||
|
||||
zmq_pollitem_t items [] = {
|
||||
{ sb, 0, ZMQ_POLLIN, 0 },
|
||||
};
|
||||
|
||||
tcp_client_write(sock_fd, test_str, strlen(test_str));
|
||||
tcp_client_write (sock_fd, test_str, strlen (test_str));
|
||||
zmq_poll (items, 1, 500);
|
||||
if (items [0].revents & ZMQ_POLLIN) {
|
||||
int n = zmq_msg_recv (&id, sb, 0);
|
||||
assert(n > 0);
|
||||
n = zmq_msg_recv (&message, sb, 0);
|
||||
assert(n > 0);
|
||||
assert(memcmp(zmq_msg_data (&message), test_str, strlen(test_str)) == 0);
|
||||
}else{
|
||||
assert(0);
|
||||
}
|
||||
assert (items [0].revents & ZMQ_POLLIN);
|
||||
int n = zmq_msg_recv (&id, sb, 0);
|
||||
assert (n > 0);
|
||||
n = zmq_msg_recv (&message, sb, 0);
|
||||
assert (n > 0);
|
||||
assert (memcmp (zmq_msg_data (&message), test_str, strlen (test_str)) == 0);
|
||||
|
||||
zmq_msg_send (&id, sb, ZMQ_SNDMORE);
|
||||
zmq_msg_send (&message, sb, ZMQ_SNDMORE);// SNDMORE option is ignored
|
||||
zmq_msg_send (&message, sb, ZMQ_SNDMORE); // SNDMORE option is ignored
|
||||
|
||||
tcp_client_read(sock_fd);
|
||||
tcp_client_close(sock_fd);
|
||||
tcp_client_read (sock_fd);
|
||||
tcp_client_close (sock_fd);
|
||||
|
||||
zmq_msg_close(&id);
|
||||
zmq_msg_close(&message);
|
||||
zmq_msg_close (&id);
|
||||
zmq_msg_close (&message);
|
||||
|
||||
|
||||
zmq_close(sb);
|
||||
zmq_term(ctx);
|
||||
zmq_close (sb);
|
||||
zmq_term (ctx);
|
||||
|
||||
fprintf (stderr, "test_raw_sock PASSED.\n");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user