mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-25 10:09:38 +02:00
Check message syntax in REQ asynchronously
This patch adds support for checking messages as they arrive (as opposed to when they are recv'd by the user) and drop the connection if they are malformed. It also uses this new feature to check for validity of inbound messages in REQ socket. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
@@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
#include "decoder.hpp"
|
#include "decoder.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
|
#include "likely.hpp"
|
||||||
#include "wire.hpp"
|
#include "wire.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
|
||||||
@@ -136,8 +137,14 @@ bool zmq::decoder_t::message_ready ()
|
|||||||
{
|
{
|
||||||
// Message is completely read. Push it further and start reading
|
// Message is completely read. Push it further and start reading
|
||||||
// new message. (in_progress is a 0-byte message after this point.)
|
// new message. (in_progress is a 0-byte message after this point.)
|
||||||
if (!session || !session->write (&in_progress))
|
if (unlikely (!session))
|
||||||
return false;
|
return false;
|
||||||
|
int rc = session->write (&in_progress);
|
||||||
|
if (unlikely (rc != 0)) {
|
||||||
|
if (errno != EAGAIN)
|
||||||
|
decoding_error ();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
|
next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready);
|
||||||
return true;
|
return true;
|
||||||
|
|||||||
@@ -164,10 +164,18 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
// Where to store the read data.
|
||||||
unsigned char *read_pos;
|
unsigned char *read_pos;
|
||||||
|
|
||||||
|
// How much data to read before taking next step.
|
||||||
size_t to_read;
|
size_t to_read;
|
||||||
|
|
||||||
|
// Next step. If set to NULL, it means that associated data stream
|
||||||
|
// is dead. Note that there can be still data in the process in such
|
||||||
|
// case.
|
||||||
step_t next;
|
step_t next;
|
||||||
|
|
||||||
|
// The duffer for data to decode.
|
||||||
size_t bufsize;
|
size_t bufsize;
|
||||||
unsigned char *buf;
|
unsigned char *buf;
|
||||||
|
|
||||||
|
|||||||
@@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
#include "encoder.hpp"
|
#include "encoder.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
|
#include "likely.hpp"
|
||||||
#include "wire.hpp"
|
#include "wire.hpp"
|
||||||
|
|
||||||
zmq::encoder_t::encoder_t (size_t bufsize_) :
|
zmq::encoder_t::encoder_t (size_t bufsize_) :
|
||||||
@@ -62,7 +63,14 @@ bool zmq::encoder_t::message_ready ()
|
|||||||
// Note that new state is set only if write is successful. That way
|
// Note that new state is set only if write is successful. That way
|
||||||
// unsuccessful write will cause retry on the next state machine
|
// unsuccessful write will cause retry on the next state machine
|
||||||
// invocation.
|
// invocation.
|
||||||
if (!session || !session->read (&in_progress)) {
|
if (unlikely (!session)) {
|
||||||
|
rc = in_progress.init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
rc = session->read (&in_progress);
|
||||||
|
if (unlikely (rc != 0)) {
|
||||||
|
errno_assert (errno == EAGAIN);
|
||||||
rc = in_progress.init ();
|
rc = in_progress.init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
return false;
|
return false;
|
||||||
|
|||||||
@@ -142,11 +142,20 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
// Where to get the data to write from.
|
||||||
unsigned char *write_pos;
|
unsigned char *write_pos;
|
||||||
|
|
||||||
|
// How much data to write before next step should be executed.
|
||||||
size_t to_write;
|
size_t to_write;
|
||||||
|
|
||||||
|
// Next step. If set to NULL, it means that associated data stream
|
||||||
|
// is dead.
|
||||||
step_t next;
|
step_t next;
|
||||||
|
|
||||||
|
// If true, first byte of the message is being written.
|
||||||
bool beginning;
|
bool beginning;
|
||||||
|
|
||||||
|
// The buffer for encoded data.
|
||||||
size_t bufsize;
|
size_t bufsize;
|
||||||
unsigned char *buf;
|
unsigned char *buf;
|
||||||
|
|
||||||
|
|||||||
20
src/req.cpp
20
src/req.cpp
@@ -158,3 +158,23 @@ zmq::req_session_t::~req_session_t ()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq::req_session_t::write (msg_t *msg_)
|
||||||
|
{
|
||||||
|
if (state == request_id) {
|
||||||
|
if (msg_->flags () == msg_t::label && msg_->size () == 4) {
|
||||||
|
state = body;
|
||||||
|
return xreq_session_t::write (msg_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (msg_->flags () == msg_t::more)
|
||||||
|
return xreq_session_t::write (msg_);
|
||||||
|
if (msg_->flags () == 0) {
|
||||||
|
state = request_id;
|
||||||
|
return xreq_session_t::write (msg_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -67,8 +67,16 @@ namespace zmq
|
|||||||
const char *protocol_, const char *address_);
|
const char *protocol_, const char *address_);
|
||||||
~req_session_t ();
|
~req_session_t ();
|
||||||
|
|
||||||
|
// Overloads of the functions from session_base_t.
|
||||||
|
int write (msg_t *msg_);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
enum {
|
||||||
|
request_id,
|
||||||
|
body
|
||||||
|
} state;
|
||||||
|
|
||||||
req_session_t (const req_session_t&);
|
req_session_t (const req_session_t&);
|
||||||
const req_session_t &operator = (const req_session_t&);
|
const req_session_t &operator = (const req_session_t&);
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -148,28 +148,28 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
|
|||||||
pipe->set_event_sink (this);
|
pipe->set_event_sink (this);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::session_base_t::read (msg_t *msg_)
|
int zmq::session_base_t::read (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (!pipe)
|
if (!pipe || !pipe->read (msg_)) {
|
||||||
return false;
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
if (!pipe->read (msg_))
|
}
|
||||||
return false;
|
|
||||||
|
|
||||||
incomplete_in =
|
incomplete_in =
|
||||||
msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
|
msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
|
||||||
return true;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::session_base_t::write (msg_t *msg_)
|
int zmq::session_base_t::write (msg_t *msg_)
|
||||||
{
|
{
|
||||||
if (pipe && pipe->write (msg_)) {
|
if (pipe && pipe->write (msg_)) {
|
||||||
int rc = msg_->init ();
|
int rc = msg_->init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
return true;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::flush ()
|
void zmq::session_base_t::flush ()
|
||||||
|
|||||||
@@ -48,8 +48,8 @@ namespace zmq
|
|||||||
void attach_pipe (class pipe_t *pipe_);
|
void attach_pipe (class pipe_t *pipe_);
|
||||||
|
|
||||||
// Following functions are the interface exposed towards the engine.
|
// Following functions are the interface exposed towards the engine.
|
||||||
bool read (msg_t *msg_);
|
virtual int read (msg_t *msg_);
|
||||||
bool write (msg_t *msg_);
|
virtual int write (msg_t *msg_);
|
||||||
void flush ();
|
void flush ();
|
||||||
void detach ();
|
void detach ();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user