Add NORM Transport configuration sockopts (#4541)

* added NORM transport configuration socket options, and enabled NORM use of existing sockopts ZMQ_RATE for NORM fixed-rate operation, and ZMQ_TOS
This commit is contained in:
Jeff Weston 2023-04-20 13:55:40 -04:00 committed by GitHub
parent be8af6f128
commit 2d30020691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 504 additions and 9 deletions

12
RELICENSE/weston-nrl.md Normal file
View File

@ -0,0 +1,12 @@
# Permission to Relicense under MPLv2
This is a statement by Jeffery Weston that grants permission to relicense its
copyrights in the libzmq C++ library (ZeroMQ) under the Mozilla Public License
v2 (MPLv2).
A portion of the commits made by the Github handle "weston-nrl", with commit
author "Jeff Weston" or "Jeffery Weston" are copyright of Jeffery Weston. This
document hereby grants the libzmq project team to relicense libzmq, including
all past, present and future contributions of the author listed above.
Jeffery Weston 2023/04/05 s

View File

@ -998,6 +998,145 @@ Default value:: 0
Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB, ZMQ_XSUB
ZMQ_NORM_MODE: Retrieve NORM Sender Mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets the NORM sender mode to control the operation of the NORM transport. NORM
supports fixed rate operation (0='ZMQ_NORM_FIXED'), congestion control mode
(1='ZMQ_NORM_CC'), loss-tolerant congestion control (2='ZMQ_NORM_CCL'), explicit
congestion notification (ECN)-enabled congestion control (3='ZMQ_NORM_CCE'), and
ECN-only congestion control (4='ZMQ_NORM_CCE_ECNONLY'). The default value is
TCP-friendly congestion control mode. Fixed rate mode (using datarate set by
'ZMQ_RATE') offers better performance, but care must be taken to prevent data
loss.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: 0, 1, 2, 3, 4
Default value:: 1 ('ZMQ_NORM_CC')
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_UNICAST_NACK: Retrieve NORM Unicast NACK mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Retrieves status of NORM unicast NACK mode setting for multicast receivers. If
set, NORM receiver will send Negative ACKnowledgements (NACKs) back to the
sender using unicast instead of multicast. NORM transport endpoints specifying
a unicast address will use unicast NACKs by default (without setting
'ZMQ_NORM_UNICAST_NACK').
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_BUFFER_SIZE: Retrieve NORM buffer size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets NORM buffer size for NORM transport sender, receiver, and stream.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: kilobytes
Default value:: 2048
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_SEGMENT_SIZE: Retrieve NORM segment size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets NORM sender segment size, which is the maximum message payload size of
individual NORM messages (ZMQ messages may be split over multiple NORM
messages). Ideally, this value should fit within the system/network maximum
transmission unit (MTU) after accounting for additional NORM message headers
(up to 48 bytes).
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: bytes
Default value:: 1400
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_BLOCK_SIZE: Retrieve NORM block size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets NORM sender block size, which is the number of segments in a NORM FEC
coding block. NORM repair operations take place at block boundaries. Maximum
value is 255, but parity packets ('ZMQ_NORM_NUM_PARITY') are limited to a value
of (255 - 'ZMQ_NORM_BLOCK_SIZE'). Minimum value is ('ZMQ_NORM_NUM_PARITY' + 1).
Effective value may be different based on the settings of 'ZMQ_NORM_NUM_PARITY'
and 'ZMQ_NORM_NUM_AUTOPARITY' if invalid settings are provided.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: >0, <=255
Default value:: 16
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_NUM_PARITY: Retrieve NORM parity segment setting
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets the maximum number of NORM parity symbol segments that the sender is
willing to calculate per FEC coding block for the purpose of reparing lost data.
Maximum value is 255, but is further limited to a value of
(255 - 'ZMQ_NORM_BLOCK_SIZE'). Minimum value is 'ZMQ_NORM_NUM_AUTOPARITY'.
Effective value may be different based on the setting of
'ZMQ_NORM_NUM_AUTOPARITY' if invalid settings are provided.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: >0, <255
Default value:: 4
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_NUM_AUTOPARITY: Retrieve proactive NORM parity segment setting
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets the number of NORM parity symbol segments that the sender will proactively
send at the end of each FEC coding block. By default, no proactive parity
segments will be sent; instead, parity segments will only be sent in response to
repair requests (NACKs). Maximum value is 255, but is further limited to a
maximum value of 'ZMQ_NORM_NUM_PARITY'.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: >=0, <255
Default value:: 0
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_PUSH: Retrieve NORM push mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets status of NORM stream push mode, which alters the behavior of the sender
when enqueueing new data. By default, NORM will stop accepting new messages
while waiting for old data to be transmitted and/or repaired. Enabling push mode
discards the oldest data (which may be pending repair or may never have been
transmitted) in favor of accepting new data. This may be useful in cases where
it is more important to quickly deliver new data instead of reliably delivering
older data.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: All, when using NORM transport.
RETURN VALUE
------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it

View File

@ -1529,6 +1529,145 @@ Default value:: 8192
Applicable socket types:: All, when using TCP, IPC, PGM or NORM transport.
ZMQ_NORM_MODE: NORM Sender Mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the NORM sender mode to control the operation of the NORM transport. NORM
supports fixed rate operation (0='ZMQ_NORM_FIXED'), congestion control mode
(1='ZMQ_NORM_CC'), loss-tolerant congestion control (2='ZMQ_NORM_CCL'), explicit
congestion notification (ECN)-enabled congestion control (3='ZMQ_NORM_CCE'), and
ECN-only congestion control (4='ZMQ_NORM_CCE_ECNONLY'). The default value is
TCP-friendly congestion control mode. Fixed rate mode (using datarate set by
'ZMQ_RATE') offers better performance, but care must be taken to prevent data
loss. ECN modes will set one of the ECN Capable Transport bits in the given
'ZMQ_TOS' if it is not set already.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: 0, 1, 2, 3, 4
Default value:: 1 ('ZMQ_NORM_CC')
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_UNICAST_NACK: Set NORM Unicast NACK mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set, NORM receiver will send Negative ACKnowledgements (NACKs) back to the
sender using unicast instead of multicast. NORM transport endpoints specifying
a unicast address will enable this by default, but it is disabled by default for
multicast addresses.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_BUFFER_SIZE: Set NORM buffer size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets NORM buffer size for NORM transport sender, receiver, and stream.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: kilobytes
Default value:: 2048
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_SEGMENT_SIZE: Set NORM segment size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets NORM sender segment size, which is the maximum message payload size of
individual NORM messages (ZMQ messages may be split over multiple NORM
messages). Ideally, this value should fit within the system/network maximum
transmission unit (MTU) after accounting for additional NORM message headers
(up to 48 bytes).
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: bytes
Default value:: 1400
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_BLOCK_SIZE: Set NORM block size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets NORM sender block size, which is the number of segments in a NORM FEC
coding block. NORM repair operations take place at block boundaries. Maximum
value is 255, but parity packets ('ZMQ_NORM_NUM_PARITY') are limited to a value
of (255 - 'ZMQ_NORM_BLOCK_SIZE'). Minimum value is ('ZMQ_NORM_NUM_PARITY' + 1).
Effective value may be different based on the settings of 'ZMQ_NORM_NUM_PARITY'
and 'ZMQ_NORM_NUM_AUTOPARITY' if invalid settings are provided.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: >0, <=255
Default value:: 16
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_NUM_PARITY: Set number of NORM parity segments
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the maximum number of NORM parity symbol segments that the sender is
willing to calculate per FEC coding block for the purpose of reparing lost data.
Maximum value is 255, but is further limited to a value of
(255 - 'ZMQ_NORM_BLOCK_SIZE'). Minimum value is 'ZMQ_NORM_NUM_AUTOPARITY'.
Effective value may be different based on the setting of
'ZMQ_NORM_NUM_AUTOPARITY' if invalid settings are provided.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: >0, <255
Default value:: 4
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_NUM_AUTOPARITY: Set number of proactive NORM parity segments
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Sets the number of NORM parity symbol segments that the sender will proactively
send at the end of each FEC coding block. By default, no proactive parity
segments will be sent; instead, parity segments will only be sent in response to
repair requests (NACKs). Maximum value is 255, but is further limited to a
maximum value of 'ZMQ_NORM_NUM_PARITY'.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: >=0, <255
Default value:: 0
Applicable socket types:: All, when using NORM transport.
ZMQ_NORM_PUSH: Enable NORM push mode
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Enables NORM stream push mode, which alters the behavior of the sender when
enqueueing new data. By default, NORM will stop accepting new messages while
waiting for old data to be transmitted and/or repaired. Enabling push mode
discards the oldest data (which may be pending repair or may never have been
transmitted) in favor of accepting new data. This may be useful in cases where
it is more important to quickly deliver new data instead of reliably delivering
older data.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 0 (false)
Applicable socket types:: All, when using NORM transport.
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it

View File

@ -681,6 +681,21 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_HICCUP_MSG 114
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115
#define ZMQ_TOPICS_COUNT 116
#define ZMQ_NORM_MODE 117
#define ZMQ_NORM_UNICAST_NACK 118
#define ZMQ_NORM_BUFFER_SIZE 119
#define ZMQ_NORM_SEGMENT_SIZE 120
#define ZMQ_NORM_BLOCK_SIZE 121
#define ZMQ_NORM_NUM_PARITY 122
#define ZMQ_NORM_NUM_AUTOPARITY 123
#define ZMQ_NORM_PUSH 124
/* DRAFT ZMQ_NORM_MODE options */
#define ZMQ_NORM_FIXED 0
#define ZMQ_NORM_CC 1
#define ZMQ_NORM_CCL 2
#define ZMQ_NORM_CCE 3
#define ZMQ_NORM_CCE_ECNONLY 4
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -131,9 +131,7 @@ int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
return -1;
}
// There's many other useful NORM options that could be applied here
if (NormIsUnicastAddress (addr)) {
NormSetDefaultUnicastNack (norm_session, true);
} else {
if (!NormIsUnicastAddress (addr)) {
// These only apply for multicast sessions
//NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1
NormSetTTL (
@ -150,13 +148,31 @@ int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
NormSetMulticastInterface (norm_session, ifacePtr);
}
}
if (NormIsUnicastAddress (addr) || options.norm_unicast_nacks) {
NormSetDefaultUnicastNack (norm_session, true);
}
// Set TOS but check TOS ECN bit for CCE modes
if ((options.norm_mode == ZMQ_NORM_CCE
|| options.norm_mode == ZMQ_NORM_CCE_ECNONLY)
&& (options.tos % 4 == 0)) {
// ECN Capable Transport not set, so set it
NormSetTOS (norm_session, options.tos + 1);
} else if ((options.norm_mode == ZMQ_NORM_CCE
|| options.norm_mode == ZMQ_NORM_CCE_ECNONLY)
&& (options.tos % 4 == 3)) {
// Congestion Experienced is an invalid setting, remove one of the bits
NormSetTOS (norm_session, options.tos - 1);
} else {
NormSetTOS (norm_session, options.tos);
}
if (recv) {
// The alternative NORM_SYNC_CURRENT here would provide "instant"
// receiver sync to the sender's _current_ message transmission.
// NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
NormSetDefaultSyncPolicy (norm_session, NORM_SYNC_STREAM);
if (!NormStartReceiver (norm_session, 2 * 1024 * 1024)) {
if (!NormStartReceiver (
norm_session, (unsigned long) options.norm_buffer_size * 1024)) {
// errno set by whatever failed
int savedErrno = errno;
NormDestroyInstance (norm_instance); // session gets closed, too
@ -169,11 +185,22 @@ int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
}
if (send) {
// Handle invalid settings -- num_parity must be >= num_autoparity (which has a default of 0)
unsigned char numparity =
(options.norm_num_parity >= options.norm_num_autoparity
? options.norm_num_parity
: options.norm_num_autoparity);
// Handle invalid settings -- block size must be > effective num_parity (which is <255)
unsigned char blocksize =
(options.norm_block_size > numparity ? options.norm_block_size
: numparity + 1);
// Pick a random sender instance id (aka norm sender session id)
NormSessionId instanceId = NormGetRandomSessionId ();
// TBD - provide "options" for some NORM sender parameters
if (!NormStartSender (norm_session, instanceId, 2 * 1024 * 1024, 1400,
16, 4)) {
if (!NormStartSender (norm_session, instanceId,
(unsigned long) options.norm_buffer_size * 1024,
options.norm_segment_size, blocksize,
numparity)) {
// errno set by whatever failed
int savedErrno = errno;
NormDestroyInstance (norm_instance); // session gets closed, too
@ -182,12 +209,29 @@ int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
errno = savedErrno;
return -1;
}
NormSetCongestionControl (norm_session, true);
// Handle NORM mode
if (options.norm_mode == ZMQ_NORM_FIXED) {
NormSetTxRate (norm_session, (double) options.rate * 1000);
} else {
NormSetCongestionControl (norm_session, true);
if (options.norm_mode != ZMQ_NORM_CC) {
NormSetEcnSupport (
norm_session,
((options.norm_mode == ZMQ_NORM_CCE)
|| (options.norm_mode == ZMQ_NORM_CCE_ECNONLY)),
options.norm_mode == ZMQ_NORM_CCE_ECNONLY,
options.norm_mode == ZMQ_NORM_CCL);
}
}
if (options.norm_num_autoparity > 0) {
NormSetAutoParity (norm_session, options.norm_num_autoparity);
}
norm_tx_ready = true;
is_sender = true;
if (NORM_OBJECT_INVALID
== (norm_tx_stream =
NormStreamOpen (norm_session, 2 * 1024 * 1024))) {
== (norm_tx_stream = NormStreamOpen (
norm_session,
(unsigned long) options.norm_buffer_size * 1024))) {
// errno set by whatever failed
int savedErrno = errno;
NormDestroyInstance (norm_instance); // session gets closed, too
@ -196,6 +240,8 @@ int zmq::norm_engine_t::init (const char *network_, bool send, bool recv)
errno = savedErrno;
return -1;
}
// NORM Stream options
NormStreamSetPushEnable (norm_tx_stream, options.norm_push_enable);
}
//NormSetMessageTrace(norm_session, true);

View File

@ -257,6 +257,14 @@ zmq::options_t::options_t () :
can_recv_disconnect_msg (false),
hiccup_msg (),
can_recv_hiccup_msg (false),
norm_mode (ZMQ_NORM_CC),
norm_unicast_nacks (false),
norm_buffer_size (2048),
norm_segment_size (1400),
norm_block_size (16),
norm_num_parity (4),
norm_num_autoparity (0),
norm_push_enable (false),
busy_poll (0)
{
memset (curve_public_key, 0, CURVE_KEYSIZE);
@ -832,6 +840,58 @@ int zmq::options_t::setsockopt (int option_,
&wss_trust_system);
#endif
#ifdef ZMQ_HAVE_NORM
case ZMQ_NORM_MODE:
if (is_int && value >= 0 && value <= 4) {
norm_mode = value;
return 0;
}
break;
case ZMQ_NORM_UNICAST_NACK:
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&norm_unicast_nacks);
case ZMQ_NORM_BUFFER_SIZE:
if (is_int && value > 0) {
norm_buffer_size = value;
return 0;
}
break;
case ZMQ_NORM_SEGMENT_SIZE:
if (is_int && value > 0) {
norm_segment_size = value;
return 0;
}
break;
case ZMQ_NORM_BLOCK_SIZE:
if (is_int && value > 0 && value <= 255) {
norm_block_size = value;
return 0;
}
break;
case ZMQ_NORM_NUM_PARITY:
if (is_int && value >= 0 && value < 255) {
norm_num_parity = value;
return 0;
}
break;
case ZMQ_NORM_NUM_AUTOPARITY:
if (is_int && value >= 0 && value < 255) {
norm_num_autoparity = value;
return 0;
}
break;
case ZMQ_NORM_PUSH:
return do_setsockopt_int_as_bool_strict (optval_, optvallen_,
&norm_push_enable);
#endif //ZMQ_HAVE_NORM
case ZMQ_HELLO_MSG:
if (optvallen_ > 0) {
unsigned char *bytes = (unsigned char *) optval_;
@ -1312,6 +1372,65 @@ int zmq::options_t::getsockopt (int option_,
*value = busy_poll;
}
break;
#ifdef ZMQ_HAVE_NORM
case ZMQ_NORM_MODE:
if (is_int) {
*value = norm_mode;
return 0;
}
break;
case ZMQ_NORM_UNICAST_NACK:
if (is_int) {
*value = norm_unicast_nacks;
return 0;
}
break;
case ZMQ_NORM_BUFFER_SIZE:
if (is_int) {
*value = norm_buffer_size;
return 0;
}
break;
case ZMQ_NORM_SEGMENT_SIZE:
if (is_int) {
*value = norm_segment_size;
return 0;
}
break;
case ZMQ_NORM_BLOCK_SIZE:
if (is_int) {
*value = norm_block_size;
return 0;
}
break;
case ZMQ_NORM_NUM_PARITY:
if (is_int) {
*value = norm_num_parity;
return 0;
}
break;
case ZMQ_NORM_NUM_AUTOPARITY:
if (is_int) {
*value = norm_num_autoparity;
return 0;
}
break;
case ZMQ_NORM_PUSH:
if (is_int) {
*value = norm_push_enable;
return 0;
}
break;
#endif //ZMQ_HAVE_NORM
#endif

View File

@ -313,6 +313,16 @@ struct options_t
std::vector<unsigned char> hiccup_msg;
bool can_recv_hiccup_msg;
// NORM Options
int norm_mode;
bool norm_unicast_nacks;
int norm_buffer_size;
int norm_segment_size;
int norm_block_size;
int norm_num_parity;
int norm_num_autoparity;
bool norm_push_enable;
// This option removes several delays caused by scheduling, interrupts and context switching.
int busy_poll;
};

View File

@ -73,6 +73,21 @@
#define ZMQ_HICCUP_MSG 114
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115
#define ZMQ_TOPICS_COUNT 116
#define ZMQ_NORM_MODE 117
#define ZMQ_NORM_UNICAST_NACK 118
#define ZMQ_NORM_BUFFER_SIZE 119
#define ZMQ_NORM_SEGMENT_SIZE 120
#define ZMQ_NORM_BLOCK_SIZE 121
#define ZMQ_NORM_NUM_PARITY 122
#define ZMQ_NORM_NUM_AUTOPARITY 123
#define ZMQ_NORM_PUSH 124
/* DRAFT ZMQ_NORM_MODE options */
#define ZMQ_NORM_FIXED 0
#define ZMQ_NORM_CC 1
#define ZMQ_NORM_CCL 2
#define ZMQ_NORM_CCE 3
#define ZMQ_NORM_CCE_ECNONLY 4
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1