Merge pull request #318 from shripchenko/master

2nd try wuth sock->unbind() and sock->disconnect(). now with const char*'s argument
This commit is contained in:
Pieter Hintjens 2012-04-20 09:11:56 -07:00
commit 653e5854ed
7 changed files with 56 additions and 46 deletions

View File

@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_LAST_ENDPOINT_ID 39
/* Message options */
@ -245,8 +244,8 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
size_t *optvallen);
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
ZMQ_EXPORT int zmq_unbind (void *s, void *ep);
ZMQ_EXPORT int zmq_disconnect (void *s, void *ep);
ZMQ_EXPORT int zmq_unbind (void *s, const char *addr);
ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr);
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);

View File

@ -30,7 +30,6 @@ zmq::options_t::options_t () :
rcvhwm (1000),
affinity (0),
identity_size (0),
last_endpoint_id(NULL),
rate (100),
recovery_ivl (10000),
multicast_hops (1),
@ -531,14 +530,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = last_endpoint.size()+1;
return 0;
case ZMQ_LAST_ENDPOINT_ID:
if (*optvallen_ < sizeof (void *)) {
errno = EINVAL;
return -1;
}
*((void **) optval_) = last_endpoint_id;
*optvallen_ = sizeof (void *);
return 0;
}
errno = EINVAL;

View File

@ -51,10 +51,8 @@ namespace zmq
unsigned char identity_size;
unsigned char identity [256];
// Last socket endpoint URI
// Last socket endpoint resolved URI
std::string last_endpoint;
// Last socket endpoint ID
void *last_endpoint_id;
// Maximum tranfer rate [kb/s]. Default 100kb/s.
int rate;

View File

@ -319,9 +319,8 @@ int zmq::socket_base_t::bind (const char *addr_)
endpoint_t endpoint = {this, options};
int rc = register_endpoint (addr_, endpoint);
if (rc == 0) {
// Save last endpoint info
options.last_endpoint.clear ();
options.last_endpoint_id = NULL;
// Save last endpoint URI
options.last_endpoint.assign (addr_);
}
return rc;
}
@ -350,11 +349,10 @@ int zmq::socket_base_t::bind (const char *addr_)
return -1;
}
// Save last endpoint info
options.last_endpoint_id = (void *) ((own_t *) listener);
// Save last endpoint URI
listener->get_address (options.last_endpoint);
launch_child (listener);
add_endpoint (addr_, (own_t *) listener);
return 0;
}
@ -369,11 +367,10 @@ int zmq::socket_base_t::bind (const char *addr_)
return -1;
}
// Save last endpoint info
options.last_endpoint_id = (void *) ((own_t *) listener);
// Save last endpoint URI
listener->get_address (options.last_endpoint);
launch_child (listener);
add_endpoint (addr_, (own_t *) listener);
return 0;
}
#endif
@ -464,9 +461,8 @@ int zmq::socket_base_t::connect (const char *addr_)
// increased here.
send_bind (peer.socket, pipes [1], false);
// Save last endpoint info
options.last_endpoint.clear ();
options.last_endpoint_id = NULL;
// Save last endpoint URI
options.last_endpoint.assign (addr_);
return 0;
}
@ -528,13 +524,42 @@ int zmq::socket_base_t::connect (const char *addr_)
// Attach remote end of the pipe to the session object later on.
session->attach_pipe (pipes [1]);
// Save last endpoint info
// Save last endpoint URI
paddr->to_string (options.last_endpoint);
options.last_endpoint_id = (void *) ((own_t *) session);
add_endpoint (addr_, (own_t *) session);
return 0;
}
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_)
{
// Activate the session. Make it a child of this socket.
launch_child (session);
launch_child (endpoint_);
endpoints.insert (std::make_pair (std::string (addr_), endpoint_));
}
int zmq::socket_base_t::term_endpoint (const char *addr_)
{
// Check whether the library haven't been shut down yet.
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
// Check whether message passed to the function is valid.
if (unlikely (!addr_)) {
errno = EINVAL;
return -1;
}
// Find the endpoints range (if any) corresponding to the addr_ string.
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
if (range.first == range.second)
return -1;
for (endpoints_t::iterator it = range.first; it != range.second; ++it)
term_child (it->second);
endpoints.erase (range.first, range.second);
return 0;
}
@ -603,16 +628,6 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
return 0;
}
int zmq::socket_base_t::term_endpoint (void *ep_)
{
if (unlikely (ctx_terminated)) {
errno = ETERM;
return -1;
}
term_child ((own_t *) ep_);
return 0;
}
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
{
// Check whether the library haven't been shut down yet.

View File

@ -24,6 +24,7 @@
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <string>
#include <map>
#include "own.hpp"
#include "array.hpp"
@ -72,7 +73,7 @@ namespace zmq
int getsockopt (int option_, void *optval_, size_t *optvallen_);
int bind (const char *addr_);
int connect (const char *addr_);
int term_endpoint (void *ep_);
int term_endpoint (const char *addr_);
int send (zmq::msg_t *msg_, int flags_);
int recv (zmq::msg_t *msg_, int flags_);
int close ();
@ -133,6 +134,12 @@ namespace zmq
void process_destroy ();
private:
// Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_);
// Map of open endpoints.
typedef std::multimap <std::string, own_t *> endpoints_t;
endpoints_t endpoints;
// To be called after processing commands or invoking any command
// handlers explicitly. If required, it will deallocate the socket.

View File

@ -40,7 +40,7 @@ namespace zmq
tcp_address_t ();
tcp_address_t (const sockaddr *sa, socklen_t sa_len);
~tcp_address_t ();
virtual ~tcp_address_t ();
// This function translates textual TCP address into an address
// strcuture. If 'local' is true, names are resolved as local interface

View File

@ -293,24 +293,24 @@ int zmq_connect (void *s_, const char *addr_)
return result;
}
int zmq_unbind (void *s_, void *ep_)
int zmq_unbind (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (ep_);
return s->term_endpoint (addr_);
}
int zmq_disconnect (void *s_, void *ep_)
int zmq_disconnect (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
return s->term_endpoint (ep_);
return s->term_endpoint (addr_);
}
// Sending functions.