mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-19 08:46:44 +01:00
handle decoding malformed messages
Signed-off-by: Dhammika Pathirana <dhammika@gmail.com>
This commit is contained in:
parent
8d6979922e
commit
71bef330fc
@ -54,16 +54,22 @@ bool zmq::decoder_t::one_byte_size_ready ()
|
|||||||
next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
|
next_step (tmpbuf, 8, &decoder_t::eight_byte_size_ready);
|
||||||
else {
|
else {
|
||||||
|
|
||||||
// TODO: Handle over-sized message decently.
|
|
||||||
|
|
||||||
// There has to be at least one byte (the flags) in the message).
|
// There has to be at least one byte (the flags) in the message).
|
||||||
zmq_assert (*tmpbuf > 0);
|
if (!*tmpbuf) {
|
||||||
|
decoding_error ();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// in_progress is initialised at this point so in theory we should
|
// in_progress is initialised at this point so in theory we should
|
||||||
// close it before calling zmq_msg_init_size, however, it's a 0-byte
|
// close it before calling zmq_msg_init_size, however, it's a 0-byte
|
||||||
// message and thus we can treat it as uninitialised...
|
// message and thus we can treat it as uninitialised...
|
||||||
int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
|
int rc = zmq_msg_init_size (&in_progress, *tmpbuf - 1);
|
||||||
|
if (rc != 0 && errno == ENOMEM) {
|
||||||
|
decoding_error ();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
next_step (tmpbuf, 1, &decoder_t::flags_ready);
|
next_step (tmpbuf, 1, &decoder_t::flags_ready);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
@ -75,19 +81,23 @@ bool zmq::decoder_t::eight_byte_size_ready ()
|
|||||||
// read the message data into it.
|
// read the message data into it.
|
||||||
size_t size = (size_t) get_uint64 (tmpbuf);
|
size_t size = (size_t) get_uint64 (tmpbuf);
|
||||||
|
|
||||||
// TODO: Handle over-sized message decently.
|
|
||||||
|
|
||||||
// There has to be at least one byte (the flags) in the message).
|
// There has to be at least one byte (the flags) in the message).
|
||||||
zmq_assert (size > 0);
|
if (!size) {
|
||||||
|
decoding_error ();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// in_progress is initialised at this point so in theory we should
|
// in_progress is initialised at this point so in theory we should
|
||||||
// close it before calling zmq_msg_init_size, however, it's a 0-byte
|
// close it before calling zmq_msg_init_size, however, it's a 0-byte
|
||||||
// message and thus we can treat it as uninitialised...
|
// message and thus we can treat it as uninitialised...
|
||||||
int rc = zmq_msg_init_size (&in_progress, size - 1);
|
int rc = zmq_msg_init_size (&in_progress, size - 1);
|
||||||
|
if (rc != 0 && errno == ENOMEM) {
|
||||||
|
decoding_error ();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
next_step (tmpbuf, 1, &decoder_t::flags_ready);
|
|
||||||
|
|
||||||
|
next_step (tmpbuf, 1, &decoder_t::flags_ready);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,9 +98,13 @@ namespace zmq
|
|||||||
read_pos += size_;
|
read_pos += size_;
|
||||||
to_read -= size_;
|
to_read -= size_;
|
||||||
|
|
||||||
while (!to_read)
|
while (!to_read) {
|
||||||
if (!(static_cast <T*> (this)->*next) ())
|
if (!(static_cast <T*> (this)->*next) ()) {
|
||||||
|
if (unlikely (!(static_cast <T*> (this)->next)))
|
||||||
|
return (size_t) -1;
|
||||||
return size_;
|
return size_;
|
||||||
|
}
|
||||||
|
}
|
||||||
return size_;
|
return size_;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -109,9 +113,13 @@ namespace zmq
|
|||||||
|
|
||||||
// Try to get more space in the message to fill in.
|
// Try to get more space in the message to fill in.
|
||||||
// If none is available, return.
|
// If none is available, return.
|
||||||
while (!to_read)
|
while (!to_read) {
|
||||||
if (!(static_cast <T*> (this)->*next) ())
|
if (!(static_cast <T*> (this)->*next) ()) {
|
||||||
|
if (unlikely (!(static_cast <T*> (this)->next)))
|
||||||
|
return (size_t) -1;
|
||||||
return pos;
|
return pos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// If there are no more data in the buffer, return.
|
// If there are no more data in the buffer, return.
|
||||||
if (pos == size_)
|
if (pos == size_)
|
||||||
@ -142,6 +150,13 @@ namespace zmq
|
|||||||
next = next_;
|
next = next_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This function should be called from the derived class to
|
||||||
|
// abort decoder state machine.
|
||||||
|
inline void decoding_error ()
|
||||||
|
{
|
||||||
|
next = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
unsigned char *read_pos;
|
unsigned char *read_pos;
|
||||||
|
@ -119,18 +119,24 @@ void zmq::zmq_engine_t::in_event ()
|
|||||||
// Push the data to the decoder.
|
// Push the data to the decoder.
|
||||||
size_t processed = decoder.process_buffer (inpos, insize);
|
size_t processed = decoder.process_buffer (inpos, insize);
|
||||||
|
|
||||||
// Stop polling for input if we got stuck.
|
if (unlikely (processed == (size_t) -1)) {
|
||||||
if (processed < insize) {
|
disconnection = true;
|
||||||
|
|
||||||
// This may happen if queue limits are in effect or when
|
|
||||||
// init object reads all required information from the socket
|
|
||||||
// and rejects to read more data.
|
|
||||||
reset_pollin (handle);
|
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
|
||||||
// Adjust the buffer.
|
// Stop polling for input if we got stuck.
|
||||||
inpos += processed;
|
if (processed < insize) {
|
||||||
insize -= processed;
|
|
||||||
|
// This may happen if queue limits are in effect or when
|
||||||
|
// init object reads all required information from the socket
|
||||||
|
// and rejects to read more data.
|
||||||
|
reset_pollin (handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adjust the buffer.
|
||||||
|
inpos += processed;
|
||||||
|
insize -= processed;
|
||||||
|
}
|
||||||
|
|
||||||
// Flush all messages the decoder may have produced.
|
// Flush all messages the decoder may have produced.
|
||||||
inout->flush ();
|
inout->flush ();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user