Computation of buffer size for PGM fixed.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2011-02-21 11:22:54 +01:00
parent 12486fecc4
commit 5c0931121b
2 changed files with 32 additions and 12 deletions

View File

@ -151,8 +151,9 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
// All options are of data type int // All options are of data type int
const int encapsulation_port = port_number; const int encapsulation_port = port_number;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT, if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_UCAST_PORT,
&encapsulation_port, sizeof (encapsulation_port)) || &encapsulation_port, sizeof (encapsulation_port)))
!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT, goto err_abort;
if (!pgm_setsockopt (sock, IPPROTO_PGM, PGM_UDP_ENCAP_MCAST_PORT,
&encapsulation_port, sizeof (encapsulation_port))) &encapsulation_port, sizeof (encapsulation_port)))
goto err_abort; goto err_abort;
} }
@ -200,11 +201,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
if (receiver) { if (receiver) {
const int recv_only = 1, const int recv_only = 1,
rxw_max_tpdu = (int) pgm_max_tpdu, rxw_max_tpdu = (int) pgm_max_tpdu,
rxw_sqns = (options.recovery_ivl_msec >= 0 ? rxw_sqns = compute_sqns (rxw_max_tpdu),
options.recovery_ivl_msec * options.rate /
(1000 * rxw_max_tpdu) :
options.recovery_ivl * options.rate /
rxw_max_tpdu),
peer_expiry = pgm_secs (300), peer_expiry = pgm_secs (300),
spmr_expiry = pgm_msecs (25), spmr_expiry = pgm_msecs (25),
nak_bo_ivl = pgm_msecs (50), nak_bo_ivl = pgm_msecs (50),
@ -235,11 +232,7 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_)
} else { } else {
const int send_only = 1, const int send_only = 1,
txw_max_tpdu = (int) pgm_max_tpdu, txw_max_tpdu = (int) pgm_max_tpdu,
txw_sqns = (options.recovery_ivl_msec >= 0 ? txw_sqns = compute_sqns (txw_max_tpdu),
options.recovery_ivl_msec * options.rate /
(1000 * txw_max_tpdu) :
options.recovery_ivl * options.rate /
txw_max_tpdu),
ambient_spm = pgm_secs (30), ambient_spm = pgm_secs (30),
heartbeat_spm[] = { pgm_msecs (100), heartbeat_spm[] = { pgm_msecs (100),
pgm_msecs (100), pgm_msecs (100),
@ -680,5 +673,29 @@ void zmq::pgm_socket_t::process_upstream ()
errno = EAGAIN; errno = EAGAIN;
} }
int zmq::pgm_socket_t::compute_sqns (int tpdu_)
{
// Convert rate into B/ms.
uint64_t rate = ((uint64_t) options.rate) / 8;
// Get recovery interval in milliseconds.
uint64_t interval = options.recovery_ivl_msec >= 0 ?
options.recovery_ivl_msec :
options.recovery_ivl * 1000;
// Compute the size of the buffer in bytes.
uint64_t size = interval * rate;
// Translate the size into number of packets.
uint64_t sqns = size / tpdu_;
zmq_assert (sqns >= 0);
// Buffer should be able to contain at least one packet.
if (sqns == 0)
sqns = 1;
return sqns;
}
#endif #endif

View File

@ -81,6 +81,9 @@ namespace zmq
private: private:
// Compute size of the buffer based on rate and recovery interval.
int compute_sqns (int tpdu_);
// OpenPGM transport. // OpenPGM transport.
pgm_sock_t* sock; pgm_sock_t* sock;