Merge pull request #1953 from somdoron/master

problem: src is broken and unneeded as we have metadata
This commit is contained in:
Luca Boccassi 2016-05-04 13:35:19 +01:00
commit feec47604f
10 changed files with 20 additions and 73 deletions

View File

@ -339,7 +339,6 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
#define ZMQ_SRCFD 2
#define ZMQ_SHARED 3 #define ZMQ_SHARED 3
/* Send/recv options. */ /* Send/recv options. */
@ -366,6 +365,9 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_FAIL_UNROUTABLE ZMQ_ROUTER_MANDATORY #define ZMQ_FAIL_UNROUTABLE ZMQ_ROUTER_MANDATORY
#define ZMQ_ROUTER_BEHAVIOR ZMQ_ROUTER_MANDATORY #define ZMQ_ROUTER_BEHAVIOR ZMQ_ROUTER_MANDATORY
/* Deprecated Message options */
#define ZMQ_SRCFD 2
/******************************************************************************/ /******************************************************************************/
/* 0MQ socket events and monitoring */ /* 0MQ socket events and monitoring */
/******************************************************************************/ /******************************************************************************/

View File

@ -87,7 +87,6 @@ int zmq::msg_t::init ()
u.vsm.size = 0; u.vsm.size = 0;
u.vsm.group[0] = '\0'; u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd;
return 0; return 0;
} }
@ -100,7 +99,6 @@ int zmq::msg_t::init_size (size_t size_)
u.vsm.size = (unsigned char) size_; u.vsm.size = (unsigned char) size_;
u.vsm.group[0] = '\0'; u.vsm.group[0] = '\0';
u.vsm.routing_id = 0; u.vsm.routing_id = 0;
u.vsm.fd = retired_fd;
} }
else { else {
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
@ -108,7 +106,6 @@ int zmq::msg_t::init_size (size_t size_)
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0'; u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd;
u.lmsg.content = NULL; u.lmsg.content = NULL;
if (sizeof (content_t) + size_ > size_) if (sizeof (content_t) + size_ > size_)
u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_); u.lmsg.content = (content_t*) malloc (sizeof (content_t) + size_);
@ -137,7 +134,6 @@ int zmq::msg_t::init_external_storage(content_t* content_, void* data_, size_t s
u.zclmsg.flags = 0; u.zclmsg.flags = 0;
u.zclmsg.group[0] = '\0'; u.zclmsg.group[0] = '\0';
u.zclmsg.routing_id = 0; u.zclmsg.routing_id = 0;
u.zclmsg.fd = retired_fd;
u.zclmsg.content = content_; u.zclmsg.content = content_;
u.zclmsg.content->data = data_; u.zclmsg.content->data = data_;
@ -165,7 +161,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.cmsg.size = size_; u.cmsg.size = size_;
u.cmsg.group[0] = '\0'; u.cmsg.group[0] = '\0';
u.cmsg.routing_id = 0; u.cmsg.routing_id = 0;
u.cmsg.fd = retired_fd;
} }
else { else {
u.lmsg.metadata = NULL; u.lmsg.metadata = NULL;
@ -173,7 +168,6 @@ int zmq::msg_t::init_data (void *data_, size_t size_,
u.lmsg.flags = 0; u.lmsg.flags = 0;
u.lmsg.group[0] = '\0'; u.lmsg.group[0] = '\0';
u.lmsg.routing_id = 0; u.lmsg.routing_id = 0;
u.lmsg.fd = retired_fd;
u.lmsg.content = (content_t*) malloc (sizeof (content_t)); u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) { if (!u.lmsg.content) {
errno = ENOMEM; errno = ENOMEM;
@ -197,7 +191,6 @@ int zmq::msg_t::init_delimiter ()
u.delimiter.flags = 0; u.delimiter.flags = 0;
u.delimiter.group[0] = '\0'; u.delimiter.group[0] = '\0';
u.delimiter.routing_id = 0; u.delimiter.routing_id = 0;
u.delimiter.fd = retired_fd;
return 0; return 0;
} }
@ -208,7 +201,6 @@ int zmq::msg_t::init_join ()
u.base.flags = 0; u.base.flags = 0;
u.base.group[0] = '\0'; u.base.group[0] = '\0';
u.base.routing_id = 0; u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0; return 0;
} }
@ -219,7 +211,6 @@ int zmq::msg_t::init_leave ()
u.base.flags = 0; u.base.flags = 0;
u.base.group[0] = '\0'; u.base.group[0] = '\0';
u.base.routing_id = 0; u.base.routing_id = 0;
u.base.fd = retired_fd;
return 0; return 0;
} }
@ -400,16 +391,6 @@ void zmq::msg_t::reset_flags (unsigned char flags_)
u.base.flags &= ~flags_; u.base.flags &= ~flags_;
} }
zmq::fd_t zmq::msg_t::fd ()
{
return u.base.fd;
}
void zmq::msg_t::set_fd (fd_t fd_)
{
u.base.fd = fd_;
}
zmq::metadata_t *zmq::msg_t::metadata () const zmq::metadata_t *zmq::msg_t::metadata () const
{ {
return u.base.metadata; return u.base.metadata;

View File

@ -105,8 +105,6 @@ namespace zmq
unsigned char flags (); unsigned char flags ();
void set_flags (unsigned char flags_); void set_flags (unsigned char flags_);
void reset_flags (unsigned char flags_); void reset_flags (unsigned char flags_);
fd_t fd ();
void set_fd (fd_t fd_);
metadata_t *metadata () const; metadata_t *metadata () const;
void set_metadata (metadata_t *metadata_); void set_metadata (metadata_t *metadata_);
void reset_metadata (); void reset_metadata ();
@ -139,8 +137,7 @@ namespace zmq
enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) + enum { max_vsm_size = msg_t_size - (sizeof (metadata_t *) +
3 + 3 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))};
sizeof (fd_t))};
private: private:
zmq::atomic_counter_t* refcnt(); zmq::atomic_counter_t* refcnt();
@ -179,13 +176,11 @@ namespace zmq
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} base; } base;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
@ -195,7 +190,6 @@ namespace zmq
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} vsm; } vsm;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
@ -204,13 +198,11 @@ namespace zmq
sizeof (content_t*) + sizeof (content_t*) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} lmsg; } lmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
@ -219,13 +211,11 @@ namespace zmq
sizeof (content_t*) + sizeof (content_t*) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} zclmsg; } zclmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
@ -236,26 +226,22 @@ namespace zmq
sizeof (size_t) + sizeof (size_t) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} cmsg; } cmsg;
struct { struct {
metadata_t *metadata; metadata_t *metadata;
unsigned char unused [msg_t_size - (sizeof (metadata_t *) + unsigned char unused [msg_t_size - (sizeof (metadata_t *) +
2 + 2 +
16 + 16 +
sizeof (uint32_t) + sizeof (uint32_t))];
sizeof (fd_t))];
unsigned char type; unsigned char type;
unsigned char flags; unsigned char flags;
char group [16]; char group [16];
uint32_t routing_id; uint32_t routing_id;
fd_t fd;
} delimiter; } delimiter;
} u; } u;
}; };

View File

@ -197,7 +197,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool
last_tsc (0), last_tsc (0),
ticks (0), ticks (0),
rcvmore (false), rcvmore (false),
file_desc(-1),
monitor_socket (NULL), monitor_socket (NULL),
monitor_events (0), monitor_events (0),
thread_safe (thread_safe_), thread_safe (thread_safe_),
@ -1215,8 +1214,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately. // If we have the message, return immediately.
if (rc == 0) { if (rc == 0) {
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX (); EXIT_MUTEX ();
return 0; return 0;
@ -1238,8 +1235,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
EXIT_MUTEX (); EXIT_MUTEX ();
return rc; return rc;
} }
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX (); EXIT_MUTEX ();
@ -1279,8 +1274,6 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
} }
} }
if (file_desc != retired_fd)
msg_->set_fd(file_desc);
extract_flags (msg_); extract_flags (msg_);
EXIT_MUTEX (); EXIT_MUTEX ();
return 0; return 0;
@ -1646,16 +1639,6 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_)
return rc; return rc;
} }
void zmq::socket_base_t::set_fd(zmq::fd_t fd_)
{
file_desc = fd_;
}
zmq::fd_t zmq::socket_base_t::fd()
{
return file_desc;
}
void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_) void zmq::socket_base_t::event_connected (const std::string &addr_, int fd_)
{ {
if (monitor_events & ZMQ_EVENT_CONNECTED) if (monitor_events & ZMQ_EVENT_CONNECTED)

View File

@ -123,9 +123,6 @@ namespace zmq
int monitor (const char *endpoint_, int events_); int monitor (const char *endpoint_, int events_);
void set_fd(fd_t fd_);
fd_t fd();
void event_connected (const std::string &addr_, int fd_); void event_connected (const std::string &addr_, int fd_);
void event_connect_delayed (const std::string &addr_, int err_); void event_connect_delayed (const std::string &addr_, int err_);
void event_connect_retried (const std::string &addr_, int interval_); void event_connect_retried (const std::string &addr_, int interval_);
@ -264,9 +261,6 @@ namespace zmq
// True if the last message received had MORE flag set. // True if the last message received had MORE flag set.
bool rcvmore; bool rcvmore;
// File descriptor if applicable
fd_t file_desc;
// Improves efficiency of time measurement. // Improves efficiency of time measurement.
clock_t clock; clock_t clock;

View File

@ -152,9 +152,6 @@ void zmq::socks_connecter_t::in_event ()
if (rc == -1) if (rc == -1)
error (); error ();
else { else {
// Remember our fd for ZMQ_SRCFD in messages
socket->set_fd (s);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (s, options, endpoint); stream_engine_t (s, options, endpoint);

View File

@ -985,6 +985,10 @@ void zmq::stream_engine_t::set_handshake_timer ()
bool zmq::stream_engine_t::init_properties (properties_t & properties) { bool zmq::stream_engine_t::init_properties (properties_t & properties) {
if (peer_address.empty()) return false; if (peer_address.empty()) return false;
properties.insert (std::make_pair("Peer-Address", peer_address)); properties.insert (std::make_pair("Peer-Address", peer_address));
// Private property to support deprecated SRCFD
std::string fd_string = static_cast<std::ostringstream*>(&(std::ostringstream() << (int)s))->str();
properties.insert (std::make_pair("__fd", fd_string));
return true; return true;
} }

View File

@ -151,9 +151,6 @@ void zmq::tcp_connecter_t::out_event ()
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt); tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd (fd);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint); stream_engine_t (fd, options, endpoint);

View File

@ -105,9 +105,6 @@ void zmq::tcp_listener_t::in_event ()
options.tcp_keepalive_idle, options.tcp_keepalive_intvl); options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
tune_tcp_maxrt (fd, options.tcp_maxrt); tune_tcp_maxrt (fd, options.tcp_maxrt);
// remember our fd for ZMQ_SRCFD in messages
socket->set_fd(fd);
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint); stream_engine_t (fd, options, endpoint);

View File

@ -680,11 +680,17 @@ int zmq_msg_more (zmq_msg_t *msg_)
int zmq_msg_get (zmq_msg_t *msg_, int property_) int zmq_msg_get (zmq_msg_t *msg_, int property_)
{ {
const char* fd_string;
switch (property_) { switch (property_) {
case ZMQ_MORE: case ZMQ_MORE:
return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0; return (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::more)? 1: 0;
case ZMQ_SRCFD: case ZMQ_SRCFD:
return (int)((zmq::msg_t*) msg_)->fd (); fd_string = zmq_msg_gets(msg_, "__fd");
if (fd_string == NULL)
return (int)-1;
return atoi(fd_string);
case ZMQ_SHARED: case ZMQ_SHARED:
return (((zmq::msg_t*) msg_)->is_cmsg ()) || return (((zmq::msg_t*) msg_)->is_cmsg ()) ||
(((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0; (((zmq::msg_t*) msg_)->flags () & zmq::msg_t::shared)? 1: 0;