From b9c274814697ef349638989792b3588d0b06b911 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Wed, 30 Apr 2014 14:17:38 +0200 Subject: [PATCH] Add metadata to received messages --- src/Makefile.am | 2 ++ src/curve_server.cpp | 2 +- src/gssapi_server.cpp | 2 +- src/i_properties.hpp | 4 ++- src/mechanism.cpp | 16 ++++++++++- src/mechanism.hpp | 10 ++++++- src/metadata.cpp | 49 ++++++++++++++++++++++++++++++++++ src/metadata.hpp | 59 +++++++++++++++++++++++++++++++++++++++++ src/null_mechanism.cpp | 2 +- src/plain_mechanism.cpp | 2 +- src/stream_engine.cpp | 3 +++ src/zmq.cpp | 2 +- 12 files changed, 145 insertions(+), 8 deletions(-) create mode 100644 src/metadata.cpp create mode 100644 src/metadata.hpp diff --git a/src/Makefile.am b/src/Makefile.am index f5de3a69..3551233e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -46,6 +46,7 @@ libzmq_la_SOURCES = \ likely.hpp \ mailbox.hpp \ mechanism.hpp \ + metadata.hpp \ msg.hpp \ mtrie.hpp \ mutex.hpp \ @@ -118,6 +119,7 @@ libzmq_la_SOURCES = \ lb.cpp \ mailbox.cpp \ mechanism.cpp \ + metadata.cpp \ msg.cpp \ mtrie.cpp \ norm_engine.cpp \ diff --git a/src/curve_server.cpp b/src/curve_server.cpp index dcaed570..e5083028 100644 --- a/src/curve_server.cpp +++ b/src/curve_server.cpp @@ -663,7 +663,7 @@ int zmq::curve_server_t::receive_and_process_zap_reply () // Process metadata frame rc = parse_metadata (static_cast (msg [6].data ()), - msg [6].size ()); + msg [6].size (), true); error: for (int i = 0; i < 7; i++) { diff --git a/src/gssapi_server.cpp b/src/gssapi_server.cpp index c1fabf88..f55db538 100644 --- a/src/gssapi_server.cpp +++ b/src/gssapi_server.cpp @@ -268,7 +268,7 @@ int zmq::gssapi_server_t::receive_and_process_zap_reply () // Process metadata frame rc = parse_metadata (static_cast (msg [6].data ()), - msg [6].size ()); + msg [6].size (), true); error: for (int i = 0; i < 7; i++) { diff --git a/src/i_properties.hpp b/src/i_properties.hpp index 043e671c..e2f0203c 100644 --- a/src/i_properties.hpp +++ b/src/i_properties.hpp @@ -20,6 +20,8 @@ #ifndef __ZMQ_I_PROPERTIES_HPP_INCLUDED__ #define __ZMQ_I_PROPERTIES_HPP_INCLUDED__ +#include + namespace zmq { // Interface for accessing message properties. @@ -32,7 +34,7 @@ namespace zmq // Returns pointer to property value or NULL if // property not found. - virtual const char *get (const char *property) const = 0; + virtual const char *get (const std::string &property) const = 0; virtual void add_ref () = 0; diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 85d96905..a4e1bace 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -18,6 +18,7 @@ */ #include +#include #include "mechanism.hpp" #include "options.hpp" @@ -26,12 +27,16 @@ #include "wire.hpp" zmq::mechanism_t::mechanism_t (const options_t &options_) : + metadata (NULL), options (options_) { } zmq::mechanism_t::~mechanism_t () { + if (metadata != NULL) + if (metadata->drop_ref ()) + delete metadata; } void zmq::mechanism_t::set_peer_identity (const void *id_ptr, size_t id_size) @@ -83,8 +88,9 @@ size_t zmq::mechanism_t::add_property (unsigned char *ptr, const char *name, } int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, - size_t length_) + size_t length_, bool zap_flag) { + std::map dict; size_t bytes_left = length_; while (bytes_left > 1) { @@ -125,11 +131,19 @@ int zmq::mechanism_t::parse_metadata (const unsigned char *ptr_, if (rc == -1) return -1; } + + dict.insert ( + std::map ::value_type ( + name, std::string ((char *) value, value_length))); } if (bytes_left > 0) { errno = EPROTO; return -1; } + if (zap_flag) { + assert (metadata == NULL); + metadata = new (std::nothrow) metadata_t (dict); + } return 0; } diff --git a/src/mechanism.hpp b/src/mechanism.hpp index 9f60423f..3b3f5b7c 100644 --- a/src/mechanism.hpp +++ b/src/mechanism.hpp @@ -23,6 +23,7 @@ #include "stdint.hpp" #include "options.hpp" #include "blob.hpp" +#include "metadata.hpp" namespace zmq { @@ -64,6 +65,8 @@ namespace zmq blob_t get_user_id () const; + metadata_t *get_metadata () { return metadata; } + protected: // Only used to identify the socket for the Socket-Type @@ -77,7 +80,8 @@ namespace zmq // Metadata consists of a list of properties consisting of // name and value as size-specified strings. // Returns 0 on success and -1 on error, in which case errno is set. - int parse_metadata (const unsigned char *ptr_, size_t length); + int parse_metadata ( + const unsigned char *ptr_, size_t length, bool zap_flag = false); // This is called by parse_property method whenever it // parses a new property. The function should return 0 @@ -89,6 +93,10 @@ namespace zmq virtual int property (const std::string& name_, const void *value_, size_t length_); + // Metadada as returned by ZAP protocol. + // NULL if no metadata received. + metadata_t *metadata; + options_t options; private: diff --git a/src/metadata.cpp b/src/metadata.cpp new file mode 100644 index 00000000..357d87cb --- /dev/null +++ b/src/metadata.cpp @@ -0,0 +1,49 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "metadata.hpp" + +zmq::metadata_t::metadata_t (dict_t &dict) : + ref_cnt (1), + dict (dict) +{ +} + +zmq::metadata_t::~metadata_t () +{ +} + +const char *zmq::metadata_t::get (const std::string &property) const +{ + dict_t::const_iterator it = dict.find (property); + if (it == dict.end ()) + return NULL; + else + return it->second.c_str (); +} + +void zmq::metadata_t::add_ref () +{ + ref_cnt.add (1); +} + +bool zmq::metadata_t::drop_ref () +{ + return !ref_cnt.sub (1); +} diff --git a/src/metadata.hpp b/src/metadata.hpp new file mode 100644 index 00000000..72ab6fa9 --- /dev/null +++ b/src/metadata.hpp @@ -0,0 +1,59 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_METADATA_HPP_INCLUDED__ +#define __ZMQ_METADATA_HPP_INCLUDED__ + +#include + +#include "atomic_counter.hpp" +#include "i_properties.hpp" + +namespace zmq +{ + class metadata_t : public i_properties + { + public: + + metadata_t (std::map &dict); + virtual ~metadata_t (); + + // Returns pointer to property value or NULL if + // property is not found. + virtual const char *get (const std::string &property) const; + + virtual void add_ref (); + + // Drop reference. Returns true iff the reference + // counter drops to zero. + virtual bool drop_ref (); + + private: + + // Reference counter. + atomic_counter_t ref_cnt; + + // Dictionary holding metadata. + typedef std::map dict_t; + const dict_t dict; + }; + +} + +#endif diff --git a/src/null_mechanism.cpp b/src/null_mechanism.cpp index 3770d791..141c4dff 100644 --- a/src/null_mechanism.cpp +++ b/src/null_mechanism.cpp @@ -285,7 +285,7 @@ int zmq::null_mechanism_t::receive_and_process_zap_reply () // Process metadata frame rc = parse_metadata (static_cast (msg [6].data ()), - msg [6].size ()); + msg [6].size (), true); error: for (int i = 0; i < 7; i++) { diff --git a/src/plain_mechanism.cpp b/src/plain_mechanism.cpp index e6e3feb1..f6f6413e 100644 --- a/src/plain_mechanism.cpp +++ b/src/plain_mechanism.cpp @@ -502,7 +502,7 @@ int zmq::plain_mechanism_t::receive_and_process_zap_reply () // Process metadata frame rc = parse_metadata (static_cast (msg [6].data ()), - msg [6].size ()); + msg [6].size (), true); error: for (int i = 0; i < 7; i++) { diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index c90ea885..aac93da6 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -780,6 +780,9 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_) if (mechanism->decode (msg_) == -1) return -1; + metadata_t *metadata = mechanism->get_metadata (); + if (metadata) + msg_->set_properties (metadata); if (session->push_msg (msg_) == -1) { if (errno == EAGAIN) write_msg = &stream_engine_t::push_one_then_decode_and_push; diff --git a/src/zmq.cpp b/src/zmq.cpp index 69a94d1d..1a2b3b74 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -649,7 +649,7 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) { zmq::i_properties *properties = ((zmq::msg_t*) msg_)->properties (); if (properties) - return properties->get (property_); + return properties->get (std::string (property_)); else return NULL; }