Merge pull request #2981 from eponsko/master

Problem: no support for ZMTP 3.1 application metadata
This commit is contained in:
Luca Boccassi 2018-03-15 12:57:40 +00:00 committed by GitHub
commit e388774737
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 226 additions and 5 deletions

View File

@ -853,7 +853,8 @@ test_apps += tests/test_poller \
tests/test_radio_dish \
tests/test_udp \
tests/test_scatter_gather \
tests/test_dgram
tests/test_dgram \
tests/test_app_meta
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la
@ -879,6 +880,10 @@ tests_test_scatter_gather_LDADD = src/libzmq.la
tests_test_dgram_SOURCES = tests/test_dgram.cpp
tests_test_dgram_LDADD = src/libzmq.la
tests_test_app_meta_SOURCES = tests/test_app_meta.cpp
tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS}
tests_test_app_meta_CPPFLAGS = ${UNITY_CPPFLAGS}
endif
if ENABLE_STATIC

View File

@ -585,6 +585,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
#define ZMQ_BINDTODEVICE 92
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */

View File

@ -170,13 +170,30 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf,
options.routing_id, options.routing_id_size);
}
for (std::map<std::string, std::string>::const_iterator it =
options.app_metadata.begin ();
it != options.app_metadata.end (); ++it)
ptr +=
add_property (ptr, buf_capacity - (ptr - buf), it->first.c_str (),
it->second.c_str (), strlen (it->second.c_str ()));
return ptr - buf;
}
size_t zmq::mechanism_t::basic_properties_len () const
{
const char *socket_type = socket_type_string (options.type);
int meta_len = 0;
for (std::map<std::string, std::string>::const_iterator it =
options.app_metadata.begin ();
it != options.app_metadata.end (); ++it)
meta_len +=
property_len (it->first.c_str (), strlen (it->second.c_str ()));
return property_len (ZMTP_PROPERTY_SOCKET_TYPE, strlen (socket_type))
+ meta_len
+ ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER
|| options.type == ZMQ_ROUTER)
? property_len (ZMTP_PROPERTY_IDENTITY, options.routing_id_size)

View File

@ -706,6 +706,26 @@ int zmq::options_t::setsockopt (int option_,
return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_,
&loopback_fastpath);
case ZMQ_METADATA:
if (optvallen_ > 0 && !is_int) {
std::string s ((char *) optval_);
size_t pos = 0;
std::string key, val, delimiter = ":";
pos = s.find (delimiter);
if (pos != std::string::npos && pos != 0
&& pos != s.length () - 1) {
key = s.substr (0, pos);
if (key.compare (0, 2, "X-") == 0 && key.length () < 256) {
val = s.substr (pos + 1, s.length ());
app_metadata.insert (
std::pair<std::string, std::string> (key, val));
return 0;
}
}
}
errno = EINVAL;
return -1;
break;
default:
#if defined(ZMQ_ACT_MILITANT)
// There are valid scenarios for probing with unknown socket option

View File

@ -33,6 +33,7 @@
#include <string>
#include <vector>
#include <set>
#include <map>
#include "atomic_ptr.hpp"
#include "stddef.h"
@ -258,6 +259,9 @@ struct options_t
// Use zero copy strategy for storing message content when decoding.
bool zero_copy;
// Application metadata
std::map<std::string, std::string> app_metadata;
};
int do_getsockopt (void *const optval_,

View File

@ -56,6 +56,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
#define ZMQ_BINDTODEVICE 92
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
#define ZMQ_LOOPBACK_FASTPATH 94
#define ZMQ_METADATA 95
/* DRAFT 0MQ socket events and monitoring */
/* Unspecified system errors during handshake. Event value is an errno. */

View File

@ -135,6 +135,7 @@ IF (ENABLE_DRAFTS)
test_udp
test_scatter_gather
test_dgram
test_app_meta
)
ENDIF (ENABLE_DRAFTS)

View File

@ -30,20 +30,17 @@
#include "testutil.hpp"
#include <unity.h>
void *ctx;
void setUp ()
{
ctx = zmq_ctx_new ();
}
void tearDown ()
{
zmq_ctx_term (ctx);
ctx = NULL;
}
void test_tipc_port_name_and_domain ()
{
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
// test Port Name addressing
@ -64,6 +61,8 @@ void test_tipc_port_name_and_domain ()
rc = zmq_close (sb);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
void test_tipc_port_identity ()
@ -72,6 +71,7 @@ void test_tipc_port_identity ()
size_t size = 256;
unsigned int z, c, n, ref;
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
void *sb = zmq_socket (ctx, ZMQ_REP);
@ -101,10 +101,13 @@ void test_tipc_port_identity ()
rc = zmq_close (sb);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
void test_tipc_bad_addresses ()
{
void *ctx = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (ctx);
// Test Port Name addressing
@ -124,6 +127,8 @@ void test_tipc_bad_addresses ()
// Clean up
rc = zmq_close (sb);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}

167
tests/test_app_meta.cpp Normal file
View File

@ -0,0 +1,167 @@
/*
Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include <unity.h>
void setUp ()
{
}
void tearDown ()
{
}
void test_app_meta_reqrep ()
{
void *ctx;
zmq_msg_t msg;
void *rep_sock, *req_sock;
const char *req_hello = "X-hello:hello";
const char *req_connection = "X-connection:primary";
const char *req_z85 = "X-bin:009c6";
const char *rep_hello = "X-hello:world";
const char *rep_connection = "X-connection:backup";
const char *bad_strings[] = {
":",
"key:",
":value",
"keyvalue",
"",
"X-"
"KeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKe"
"yTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyT"
"ooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyToo"
"LongKeyTooLongKeyTooLongKeyTooLongKeyTooLong:value"};
ctx = zmq_ctx_new ();
rep_sock = zmq_socket (ctx, ZMQ_REP);
TEST_ASSERT_NOT_NULL (rep_sock);
req_sock = zmq_socket (ctx, ZMQ_REQ);
TEST_ASSERT_NOT_NULL (req_sock);
int rc =
zmq_setsockopt (rep_sock, ZMQ_METADATA, rep_hello, strlen (rep_hello));
TEST_ASSERT_EQUAL_INT (0, rc);
int l = 0;
rc = zmq_setsockopt (rep_sock, ZMQ_LINGER, &l, sizeof (l));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (rep_sock, ZMQ_METADATA, rep_connection,
strlen (rep_connection));
TEST_ASSERT_EQUAL_INT (0, rc);
for (int i = 0; i < 6; i++) {
rc = zmq_setsockopt (rep_sock, ZMQ_METADATA, bad_strings[i],
strlen (bad_strings[i]));
TEST_ASSERT_EQUAL_INT (-1, rc);
}
rc = zmq_bind (rep_sock, "tcp://127.0.0.1:5555");
TEST_ASSERT_EQUAL_INT (0, rc);
l = 0;
rc = zmq_setsockopt (req_sock, ZMQ_LINGER, &l, sizeof (l));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_hello, strlen (req_hello));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_connection,
strlen (req_connection));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_z85, strlen (req_z85));
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_connect (req_sock, "tcp://127.0.0.1:5555");
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_msg_init_size (&msg, 1);
TEST_ASSERT_EQUAL_INT (0, rc);
char *data = (char *) zmq_msg_data (&msg);
data[0] = 1;
rc = zmq_msg_send (&msg, req_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_msg_init (&msg);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_msg_recv (&msg, rep_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
TEST_ASSERT_EQUAL_STRING ("hello", zmq_msg_gets (&msg, "X-hello"));
TEST_ASSERT_EQUAL_STRING ("primary", zmq_msg_gets (&msg, "X-connection"));
char *bindata = (char *) zmq_msg_gets (&msg, "X-bin");
TEST_ASSERT_NOT_NULL (bindata);
uint8_t rawdata[4];
void *ret = zmq_z85_decode (rawdata, bindata);
TEST_ASSERT_NOT_NULL (ret);
TEST_ASSERT_EQUAL_UINT8 (0, rawdata[0]);
TEST_ASSERT_EQUAL_UINT8 (1, rawdata[1]);
TEST_ASSERT_EQUAL_UINT8 (2, rawdata[2]);
TEST_ASSERT_EQUAL_UINT8 (3, rawdata[3]);
TEST_ASSERT_NULL (zmq_msg_gets (&msg, "X-foobar"));
TEST_ASSERT_NULL (zmq_msg_gets (&msg, "foobar"));
rc = zmq_msg_send (&msg, rep_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
rc = zmq_msg_recv (&msg, req_sock, 0);
TEST_ASSERT_EQUAL_INT (1, rc);
TEST_ASSERT_EQUAL_STRING ("world", zmq_msg_gets (&msg, "X-hello"));
TEST_ASSERT_EQUAL_STRING ("backup", zmq_msg_gets (&msg, "X-connection"));
rc = zmq_msg_close (&msg);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (req_sock);
TEST_ASSERT_EQUAL_INT (0, rc);
rc = zmq_close (rep_sock);
TEST_ASSERT_EQUAL_INT (0, rc);
zmq_ctx_term (ctx);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_app_meta_reqrep);
return UNITY_END ();
}