mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-01 10:50:28 +08:00
Merge pull request #317 from shripchenko/master
implement zmq_unbind(),zmq_disconnect(), zmq->sock->getsockopt(ZMQ_LAST_ENDPOINT_ID)
This commit is contained in:
commit
8837852546
@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
||||
#define ZMQ_TCP_KEEPALIVE_IDLE 36
|
||||
#define ZMQ_TCP_KEEPALIVE_INTVL 37
|
||||
#define ZMQ_TCP_ACCEPT_FILTER 38
|
||||
#define ZMQ_LAST_ENDPOINT_ID 39
|
||||
|
||||
|
||||
/* Message options */
|
||||
@ -244,6 +245,8 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
|
||||
size_t *optvallen);
|
||||
ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
|
||||
ZMQ_EXPORT int zmq_unbind (void *s, void *ep);
|
||||
ZMQ_EXPORT int zmq_disconnect (void *s, void *ep);
|
||||
ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
|
||||
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
|
||||
|
||||
|
@ -23,7 +23,8 @@
|
||||
#include "tcp_address.hpp"
|
||||
#include "ipc_address.hpp"
|
||||
|
||||
#include <string.h>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
zmq::address_t::address_t (
|
||||
const std::string &protocol_, const std::string &address_)
|
||||
@ -50,3 +51,28 @@ zmq::address_t::~address_t ()
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int zmq::address_t::to_string (std::string &addr_)
|
||||
{
|
||||
if (protocol == "tcp") {
|
||||
if (resolved.tcp_addr) {
|
||||
return resolved.tcp_addr->to_string(addr_);
|
||||
}
|
||||
}
|
||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
||||
else if (protocol == "ipc") {
|
||||
if (resolved.ipc_addr) {
|
||||
return resolved.tcp_addr->to_string(addr_);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (!protocol.empty () && !address.empty ()) {
|
||||
std::stringstream s;
|
||||
s << protocol << "://" << address;
|
||||
addr_ = s.str ();
|
||||
return 0;
|
||||
}
|
||||
addr_.clear ();
|
||||
return -1;
|
||||
}
|
||||
|
@ -44,6 +44,8 @@ namespace zmq
|
||||
ipc_address_t *ipc_addr;
|
||||
#endif
|
||||
} resolved;
|
||||
|
||||
int to_string (std::string &addr_);
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -24,13 +24,24 @@
|
||||
|
||||
#include "err.hpp"
|
||||
|
||||
#include <string.h>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
zmq::ipc_address_t::ipc_address_t ()
|
||||
{
|
||||
memset (&address, 0, sizeof (address));
|
||||
}
|
||||
|
||||
zmq::ipc_address_t::ipc_address_t (const sockaddr *sa, socklen_t sa_len)
|
||||
{
|
||||
zmq_assert(sa && sa_len > 0);
|
||||
|
||||
memset (&address, 0, sizeof (address));
|
||||
if (sa->sa_family == AF_UNIX) {
|
||||
memcpy(&address, sa, sa_len);
|
||||
}
|
||||
}
|
||||
|
||||
zmq::ipc_address_t::~ipc_address_t ()
|
||||
{
|
||||
}
|
||||
@ -47,6 +58,19 @@ int zmq::ipc_address_t::resolve (const char *path_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::ipc_address_t::to_string (std::string &addr_)
|
||||
{
|
||||
if (address.sun_family != AF_UNIX) {
|
||||
addr_.clear ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::stringstream s;
|
||||
s << "ipc://" << address.sun_path;
|
||||
addr_ = s.str ();
|
||||
return 0;
|
||||
}
|
||||
|
||||
const sockaddr *zmq::ipc_address_t::addr () const
|
||||
{
|
||||
return (sockaddr*) &address;
|
||||
|
@ -21,6 +21,8 @@
|
||||
#ifndef __ZMQ_IPC_ADDRESS_HPP_INCLUDED__
|
||||
#define __ZMQ_IPC_ADDRESS_HPP_INCLUDED__
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "platform.hpp"
|
||||
|
||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
||||
@ -36,11 +38,15 @@ namespace zmq
|
||||
public:
|
||||
|
||||
ipc_address_t ();
|
||||
ipc_address_t (const sockaddr *sa, socklen_t sa_len);
|
||||
~ipc_address_t ();
|
||||
|
||||
// This function sets up the address for UNIX domain transport.
|
||||
int resolve (const char* path_);
|
||||
|
||||
// The opposite to resolve()
|
||||
int to_string (std::string &addr_);
|
||||
|
||||
const sockaddr *addr () const;
|
||||
socklen_t addrlen () const;
|
||||
|
||||
|
@ -98,22 +98,15 @@ void zmq::ipc_listener_t::in_event ()
|
||||
int zmq::ipc_listener_t::get_address (std::string &addr_)
|
||||
{
|
||||
struct sockaddr_storage ss;
|
||||
int rc;
|
||||
|
||||
// Get the details of the IPC socket
|
||||
socklen_t sl = sizeof (ss);
|
||||
rc = getsockname (s, (sockaddr *) &ss, &sl);
|
||||
int rc = getsockname (s, (sockaddr *) &ss, &sl);
|
||||
if (rc != 0) {
|
||||
addr_.clear ();
|
||||
return rc;
|
||||
}
|
||||
|
||||
// Store the address for retrieval by users using wildcards
|
||||
addr_ = std::string ("ipc://");
|
||||
struct sockaddr_un saddr;
|
||||
memcpy (&saddr, &ss, sizeof (saddr));
|
||||
|
||||
addr_.append (saddr.sun_path);
|
||||
return 0;
|
||||
ipc_address_t addr ((struct sockaddr *) &ss, sl);
|
||||
return addr.to_string (addr_);
|
||||
}
|
||||
|
||||
int zmq::ipc_listener_t::set_address (const char *addr_)
|
||||
|
@ -48,7 +48,7 @@ namespace zmq
|
||||
|
||||
// Set address to listen on.
|
||||
int set_address (const char *addr_);
|
||||
|
||||
|
||||
// Get the bound address for use with wildcards
|
||||
int get_address (std::string &addr_);
|
||||
|
||||
|
@ -30,6 +30,7 @@ zmq::options_t::options_t () :
|
||||
rcvhwm (1000),
|
||||
affinity (0),
|
||||
identity_size (0),
|
||||
last_endpoint_id(NULL),
|
||||
rate (100),
|
||||
recovery_ivl (10000),
|
||||
multicast_hops (1),
|
||||
@ -529,6 +530,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1);
|
||||
*optvallen_ = last_endpoint.size()+1;
|
||||
return 0;
|
||||
|
||||
case ZMQ_LAST_ENDPOINT_ID:
|
||||
if (*optvallen_ < sizeof (void *)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
*((void **) optval_) = last_endpoint_id;
|
||||
*optvallen_ = sizeof (void *);
|
||||
return 0;
|
||||
}
|
||||
|
||||
errno = EINVAL;
|
||||
|
@ -53,6 +53,8 @@ namespace zmq
|
||||
|
||||
// Last socket endpoint URI
|
||||
std::string last_endpoint;
|
||||
// Last socket endpoint ID
|
||||
void *last_endpoint_id;
|
||||
|
||||
// Maximum tranfer rate [kb/s]. Default 100kb/s.
|
||||
int rate;
|
||||
|
@ -80,6 +80,11 @@ void zmq::own_t::launch_child (own_t *object_)
|
||||
send_own (this, object_);
|
||||
}
|
||||
|
||||
void zmq::own_t::term_child (own_t *object_)
|
||||
{
|
||||
process_term_req (object_);
|
||||
}
|
||||
|
||||
void zmq::own_t::process_term_req (own_t *object_)
|
||||
{
|
||||
// When shutting down we can ignore termination requests from owned
|
||||
|
@ -70,6 +70,9 @@ namespace zmq
|
||||
// Launch the supplied object and become its owner.
|
||||
void launch_child (own_t *object_);
|
||||
|
||||
// Terminate owned object
|
||||
void term_child (own_t *object_);
|
||||
|
||||
// Ask owner object to terminate this object. It may take a while
|
||||
// while actual termination is started. This function should not be
|
||||
// called more than once.
|
||||
|
@ -317,11 +317,16 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
|
||||
if (protocol == "inproc") {
|
||||
endpoint_t endpoint = {this, options};
|
||||
return register_endpoint (addr_, endpoint);
|
||||
int rc = register_endpoint (addr_, endpoint);
|
||||
if (rc == 0) {
|
||||
// Save last endpoint info
|
||||
options.last_endpoint.clear ();
|
||||
options.last_endpoint_id = NULL;
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (protocol == "pgm" || protocol == "epgm") {
|
||||
|
||||
// For convenience's sake, bind can be used interchageable with
|
||||
// connect for PGM and EPGM transports.
|
||||
return connect (addr_);
|
||||
@ -345,7 +350,10 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
rc = listener->get_address (options.last_endpoint);
|
||||
// Save last endpoint info
|
||||
options.last_endpoint_id = (void *) ((own_t *) listener);
|
||||
listener->get_address (options.last_endpoint);
|
||||
|
||||
launch_child (listener);
|
||||
return 0;
|
||||
}
|
||||
@ -361,7 +369,10 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
rc = listener->get_address (options.last_endpoint);
|
||||
// Save last endpoint info
|
||||
options.last_endpoint_id = (void *) ((own_t *) listener);
|
||||
listener->get_address (options.last_endpoint);
|
||||
|
||||
launch_child (listener);
|
||||
return 0;
|
||||
}
|
||||
@ -453,6 +464,10 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// increased here.
|
||||
send_bind (peer.socket, pipes [1], false);
|
||||
|
||||
// Save last endpoint info
|
||||
options.last_endpoint.clear ();
|
||||
options.last_endpoint_id = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -513,6 +528,10 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
|
||||
// Save last endpoint info
|
||||
paddr->to_string (options.last_endpoint);
|
||||
options.last_endpoint_id = (void *) ((own_t *) session);
|
||||
|
||||
// Activate the session. Make it a child of this socket.
|
||||
launch_child (session);
|
||||
|
||||
@ -584,6 +603,16 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::socket_base_t::term_endpoint (void *ep_)
|
||||
{
|
||||
if (unlikely (ctx_terminated)) {
|
||||
errno = ETERM;
|
||||
return -1;
|
||||
}
|
||||
term_child ((own_t *) ep_);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
||||
{
|
||||
// Check whether the library haven't been shut down yet.
|
||||
|
@ -72,6 +72,7 @@ namespace zmq
|
||||
int getsockopt (int option_, void *optval_, size_t *optvallen_);
|
||||
int bind (const char *addr_);
|
||||
int connect (const char *addr_);
|
||||
int term_endpoint (void *ep_);
|
||||
int send (zmq::msg_t *msg_, int flags_);
|
||||
int recv (zmq::msg_t *msg_, int flags_);
|
||||
int close ();
|
||||
|
@ -19,8 +19,8 @@
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
#include "tcp_address.hpp"
|
||||
#include "platform.hpp"
|
||||
@ -251,9 +251,8 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_,
|
||||
int rc = resolve_nic_name (interface_, ipv4only_);
|
||||
if (rc != 0 && errno != ENODEV)
|
||||
return rc;
|
||||
if (rc == 0) {
|
||||
if (rc == 0)
|
||||
return 0;
|
||||
}
|
||||
|
||||
// There's no such interface name. Assume literal address.
|
||||
#if defined ZMQ_HAVE_OPENVMS && defined __ia64
|
||||
@ -367,6 +366,19 @@ zmq::tcp_address_t::tcp_address_t ()
|
||||
memset (&address, 0, sizeof (address));
|
||||
}
|
||||
|
||||
zmq::tcp_address_t::tcp_address_t (const sockaddr *sa, socklen_t sa_len)
|
||||
{
|
||||
zmq_assert(sa && sa_len > 0);
|
||||
|
||||
memset (&address, 0, sizeof (address));
|
||||
if (sa->sa_family == AF_INET && sa_len >= sizeof (address.ipv4)) {
|
||||
memcpy(&address.ipv4, sa, sizeof (address.ipv4));
|
||||
}
|
||||
else if (sa->sa_family == AF_INET6 && sa_len >= sizeof (address.ipv6)) {
|
||||
memcpy(&address.ipv6, sa, sizeof (address.ipv6));
|
||||
}
|
||||
}
|
||||
|
||||
zmq::tcp_address_t::~tcp_address_t ()
|
||||
{
|
||||
}
|
||||
@ -421,6 +433,34 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv4only_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::tcp_address_t::to_string (std::string &addr_)
|
||||
{
|
||||
if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) {
|
||||
addr_.clear ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
// not using service resolv because of https://github.com/zeromq/libzmq/commit/1824574f9b5a8ce786853320e3ea09fe1f822bc4
|
||||
char hbuf[NI_MAXHOST];
|
||||
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0, NI_NUMERICHOST);
|
||||
if (rc != 0) {
|
||||
addr_.clear ();
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (address.generic.sa_family == AF_INET6) {
|
||||
std::stringstream s;
|
||||
s << "tcp://[" << hbuf << "]:" << ntohs (address.ipv6.sin6_port);
|
||||
addr_ = s.str ();
|
||||
}
|
||||
else {
|
||||
std::stringstream s;
|
||||
s << "tcp://" << hbuf << ":" << ntohs (address.ipv4.sin_port);
|
||||
addr_ = s.str ();
|
||||
};
|
||||
return 0;
|
||||
}
|
||||
|
||||
const sockaddr *zmq::tcp_address_t::addr () const
|
||||
{
|
||||
return &address.generic;
|
||||
@ -504,6 +544,37 @@ int zmq::tcp_address_mask_t::resolve (const char *name_, bool ipv4only_)
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::tcp_address_mask_t::to_string (std::string &addr_)
|
||||
{
|
||||
if (address.generic.sa_family != AF_INET && address.generic.sa_family != AF_INET6) {
|
||||
addr_.clear ();
|
||||
return -1;
|
||||
}
|
||||
if (address_mask == -1) {
|
||||
addr_.clear ();
|
||||
return -1;
|
||||
}
|
||||
|
||||
char hbuf[NI_MAXHOST];
|
||||
int rc = getnameinfo (addr (), addrlen (), hbuf, sizeof (hbuf), NULL, 0, NI_NUMERICHOST);
|
||||
if (rc != 0) {
|
||||
addr_.clear ();
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (address.generic.sa_family == AF_INET6) {
|
||||
std::stringstream s;
|
||||
s << "[" << hbuf << "]/" << address_mask;
|
||||
addr_ = s.str ();
|
||||
}
|
||||
else {
|
||||
std::stringstream s;
|
||||
s << hbuf << "/" << address_mask;
|
||||
addr_ = s.str ();
|
||||
};
|
||||
return 0;
|
||||
}
|
||||
|
||||
const bool zmq::tcp_address_mask_t::match_address (const struct sockaddr *ss, const socklen_t ss_len) const
|
||||
{
|
||||
zmq_assert (address_mask != -1 && ss != NULL && ss_len >= sizeof(struct sockaddr));
|
||||
|
@ -39,6 +39,7 @@ namespace zmq
|
||||
public:
|
||||
|
||||
tcp_address_t ();
|
||||
tcp_address_t (const sockaddr *sa, socklen_t sa_len);
|
||||
~tcp_address_t ();
|
||||
|
||||
// This function translates textual TCP address into an address
|
||||
@ -47,6 +48,9 @@ namespace zmq
|
||||
// If 'ipv4only' is true, the name will never resolve to IPv6 address.
|
||||
int resolve (const char* name_, bool local_, bool ipv4only_);
|
||||
|
||||
// The opposite to resolve()
|
||||
virtual int to_string (std::string &addr_);
|
||||
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
unsigned short family () const;
|
||||
#else
|
||||
@ -79,6 +83,9 @@ namespace zmq
|
||||
// Works only with remote hostnames.
|
||||
int resolve (const char* name_, bool ipv4only_);
|
||||
|
||||
// The opposite to resolve()
|
||||
int to_string (std::string &addr_);
|
||||
|
||||
const int mask () const;
|
||||
|
||||
const bool match_address (const struct sockaddr *ss, const socklen_t ss_len) const;
|
||||
|
@ -21,8 +21,7 @@
|
||||
|
||||
#include <new>
|
||||
|
||||
#include <string.h>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "platform.hpp"
|
||||
#include "tcp_listener.hpp"
|
||||
@ -121,37 +120,19 @@ void zmq::tcp_listener_t::close ()
|
||||
}
|
||||
|
||||
int zmq::tcp_listener_t::get_address (std::string &addr_)
|
||||
{
|
||||
struct sockaddr_storage ss;
|
||||
char host [NI_MAXHOST];
|
||||
int rc;
|
||||
std::stringstream address;
|
||||
|
||||
{
|
||||
// Get the details of the TCP socket
|
||||
struct sockaddr_storage ss;
|
||||
socklen_t sl = sizeof (ss);
|
||||
rc = getsockname (s, (struct sockaddr *) &ss, &sl);
|
||||
int rc = getsockname (s, (struct sockaddr *) &ss, &sl);
|
||||
|
||||
if (rc != 0) {
|
||||
addr_.clear ();
|
||||
return rc;
|
||||
}
|
||||
|
||||
rc = getnameinfo ((struct sockaddr *) &ss, sl, host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST);
|
||||
if (rc != 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (ss.ss_family == AF_INET) {
|
||||
struct sockaddr_in sa = {0};
|
||||
memcpy (&sa, &ss, sizeof (sa));
|
||||
|
||||
address << "tcp://" << host << ":" << ntohs (sa.sin_port);
|
||||
} else {
|
||||
struct sockaddr_in6 sa = {0};
|
||||
memcpy (&sa, &ss, sizeof (sa));
|
||||
|
||||
address << "tcp://[" << host << "]:" << ntohs (sa.sin6_port);
|
||||
}
|
||||
addr_ = address.str ();
|
||||
return 0;
|
||||
tcp_address_t addr ((struct sockaddr *) &ss, sl);
|
||||
return addr.to_string (addr_);
|
||||
}
|
||||
|
||||
int zmq::tcp_listener_t::set_address (const char *addr_)
|
||||
@ -203,7 +184,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
|
||||
|
||||
// Bind the socket to the network interface and port.
|
||||
rc = bind (s, address.addr (), address.addrlen ());
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
|
20
src/zmq.cpp
20
src/zmq.cpp
@ -293,6 +293,26 @@ int zmq_connect (void *s_, const char *addr_)
|
||||
return result;
|
||||
}
|
||||
|
||||
int zmq_unbind (void *s_, void *ep_)
|
||||
{
|
||||
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
return s->term_endpoint (ep_);
|
||||
}
|
||||
|
||||
int zmq_disconnect (void *s_, void *ep_)
|
||||
{
|
||||
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return -1;
|
||||
}
|
||||
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
|
||||
return s->term_endpoint (ep_);
|
||||
}
|
||||
|
||||
// Sending functions.
|
||||
|
||||
static int
|
||||
|
Loading…
x
Reference in New Issue
Block a user