mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
2nd try wuth sock->unbind() and sock->disconnect(). now with blackjack and const char*'s
This commit is contained in:
parent
7b8e728e43
commit
489481857a
@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
||||
#define ZMQ_TCP_KEEPALIVE_IDLE 36
|
||||
#define ZMQ_TCP_KEEPALIVE_INTVL 37
|
||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||
#define ZMQ_LAST_ENDPOINT_ID 39
|
||||
|
||||
|
||||
/* Message options */
|
||||
@ -245,8 +244,8 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
|
||||
size_t *optvallen);
|
||||
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_unbind (void *s, void *ep);
|
||||
ZMQ_EXPORT int zmq_disconnect (void *s, void *ep);
|
||||
ZMQ_EXPORT int zmq_unbind (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_disconnect (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
|
||||
|
||||
|
@ -30,7 +30,6 @@ zmq::options_t::options_t () :
|
||||
rcvhwm (1000),
|
||||
affinity (0),
|
||||
identity_size (0),
|
||||
last_endpoint_id(NULL),
|
||||
rate (100),
|
||||
recovery_ivl (10000),
|
||||
multicast_hops (1),
|
||||
@ -531,14 +530,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
*optvallen_ = last_endpoint.size()+1;
|
||||
return 0;
|
||||
|
||||
case ZMQ_LAST_ENDPOINT_ID:
|
||||
if (*optvallen_ < sizeof (void *)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
*((void **) optval_) = last_endpoint_id;
|
||||
*optvallen_ = sizeof (void *);
|
||||
return 0;
|
||||
}
|
||||
|
||||
errno = EINVAL;
|
||||
|
@ -51,10 +51,8 @@ namespace zmq
|
||||
unsigned char identity_size;
|
||||
unsigned char identity [256];
|
||||
|
||||
// Last socket endpoint URI
|
||||
// Last socket endpoint resolved URI
|
||||
std::string last_endpoint;
|
||||
// Last socket endpoint ID
|
||||
void *last_endpoint_id;
|
||||
|
||||
// Maximum tranfer rate [kb/s]. Default 100kb/s.
|
||||
int rate;
|
||||
|
@ -320,9 +320,8 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
endpoint_t endpoint = {this, options};
|
||||
int rc = register_endpoint (addr_, endpoint);
|
||||
if (rc == 0) {
|
||||
// Save last endpoint info
|
||||
options.last_endpoint.clear ();
|
||||
options.last_endpoint_id = NULL;
|
||||
// Save last endpoint URI
|
||||
options.last_endpoint.assign (addr_);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
@ -351,11 +350,10 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Save last endpoint info
|
||||
options.last_endpoint_id = (void *) ((own_t *) listener);
|
||||
// Save last endpoint URI
|
||||
listener->get_address (options.last_endpoint);
|
||||
|
||||
launch_child (listener);
|
||||
add_endpoint (addr_, (own_t *) listener);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -370,11 +368,10 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Save last endpoint info
|
||||
options.last_endpoint_id = (void *) ((own_t *) listener);
|
||||
// Save last endpoint URI
|
||||
listener->get_address (options.last_endpoint);
|
||||
|
||||
launch_child (listener);
|
||||
add_endpoint (addr_, (own_t *) listener);
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
@ -465,9 +462,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// increased here.
|
||||
send_bind (peer.socket, pipes [1], false);
|
||||
|
||||
// Save last endpoint info
|
||||
options.last_endpoint.clear ();
|
||||
options.last_endpoint_id = NULL;
|
||||
// Save last endpoint URI
|
||||
options.last_endpoint.assign (addr_);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -529,13 +525,42 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
|
||||
// Save last endpoint info
|
||||
// Save last endpoint URI
|
||||
paddr->to_string (options.last_endpoint);
|
||||
options.last_endpoint_id = (void *) ((own_t *) session);
|
||||
|
||||
add_endpoint (addr_, (own_t *) session);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void zmq::socket_base_t::add_endpoint (const char *addr_, own_t *endpoint_)
|
||||
{
|
||||
// Activate the session. Make it a child of this socket.
|
||||
launch_child (session);
|
||||
launch_child (endpoint_);
|
||||
endpoints.insert (std::make_pair (std::string (addr_), endpoint_));
|
||||
}
|
||||
|
||||
int zmq::socket_base_t::term_endpoint (const char *addr_)
|
||||
{
|
||||
// Check whether the library haven't been shut down yet.
|
||||
if (unlikely (ctx_terminated)) {
|
||||
errno = ETERM;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Check whether message passed to the function is valid.
|
||||
if (unlikely (!addr_)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Find the endpoints range (if any) corresponding to the addr_ string.
|
||||
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
|
||||
if (range.first == range.second)
|
||||
return -1;
|
||||
|
||||
for (endpoints_t::iterator it = range.first; it != range.second; ++it)
|
||||
term_child (it->second);
|
||||
endpoints.erase (range.first, range.second);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -605,16 +630,6 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::socket_base_t::term_endpoint (void *ep_)
|
||||
{
|
||||
if (unlikely (ctx_terminated)) {
|
||||
errno = ETERM;
|
||||
return -1;
|
||||
}
|
||||
term_child ((own_t *) ep_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
||||
{
|
||||
// Check whether the library haven't been shut down yet.
|
||||
|
@ -24,6 +24,7 @@
|
||||
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
|
||||
#include "own.hpp"
|
||||
#include "array.hpp"
|
||||
@ -71,7 +72,7 @@ namespace zmq
|
||||
int getsockopt (int option_, void *optval_, size_t *optvallen_);
|
||||
int bind (const char *addr_);
|
||||
int connect (const char *addr_);
|
||||
int term_endpoint (void *ep_);
|
||||
int term_endpoint (const char *addr_);
|
||||
int send (zmq::msg_t *msg_, int flags_);
|
||||
int recv (zmq::msg_t *msg_, int flags_);
|
||||
int close ();
|
||||
@ -132,6 +133,12 @@ namespace zmq
|
||||
void process_destroy ();
|
||||
|
||||
private:
|
||||
// Creates new endpoint ID and adds the endpoint to the map.
|
||||
void add_endpoint (const char *addr_, own_t *endpoint_);
|
||||
|
||||
// Map of open endpoints.
|
||||
typedef std::multimap <std::string, own_t *> endpoints_t;
|
||||
endpoints_t endpoints;
|
||||
|
||||
// To be called after processing commands or invoking any command
|
||||
// handlers explicitly. If required, it will deallocate the socket.
|
||||
|
@ -293,24 +293,24 @@ int zmq_connect (void *s_, const char *addr_)
|
||||
return result;
|
||||
}
|
||||
|
||||
int zmq_unbind (void *s_, void *ep_)
|
||||
int zmq_unbind (void *s_, const char *addr_)
|
||||
{
|
||||
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
return s->term_endpoint (ep_);
|
||||
return s->term_endpoint (addr_);
|
||||
}
|
||||
|
||||
int zmq_disconnect (void *s_, void *ep_)
|
||||
int zmq_disconnect (void *s_, const char *addr_)
|
||||
{
|
||||
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
return s->term_endpoint (ep_);
|
||||
return s->term_endpoint (addr_);
|
||||
}
|
||||
|
||||
// Sending functions.
|
||||
|
Loading…
Reference in New Issue
Block a user