0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Merge remote-tracking branch 'remotes/upstream/master'

This commit is contained in:
Telford Berkey 2015-02-02 10:27:25 -05:00
commit 05df7072d6
18 changed files with 815 additions and 9 deletions

View File

@ -369,6 +369,7 @@ endif()
set(cxx-sources
address.cpp
client.cpp
clock.cpp
ctx.cpp
curve_client.cpp
@ -418,6 +419,7 @@ set(cxx-sources
req.cpp
router.cpp
select.cpp
server.cpp
session_base.cpp
signaler.cpp
socket_base.cpp

View File

@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \
src/atomic_counter.hpp \
src/atomic_ptr.hpp \
src/blob.hpp \
src/client.cpp \
src/client.hpp \
src/clock.cpp \
src/clock.hpp \
src/command.hpp \
@ -144,6 +146,8 @@ src_libzmq_la_SOURCES = \
src/router.hpp \
src/select.cpp \
src/select.hpp \
src/server.cpp \
src/server.hpp \
src/session_base.cpp \
src/session_base.hpp \
src/signaler.cpp \
@ -338,7 +342,10 @@ test_apps = \
tests/test_xpub_nodrop \
tests/test_xpub_manual \
tests/test_xpub_welcome_msg \
tests/test_atomics
tests/test_atomics \
tests/test_client_server \
tests/test_server_drop_more \
tests/test_client_drop_more
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
@ -510,6 +517,15 @@ tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
tests_test_atomics_SOURCES = tests/test_atomics.cpp
tests_test_atomics_LDADD = src/libzmq.la
tests_test_client_server_SOURCES = tests/test_client_server.cpp
tests_test_client_server_LDADD = src/libzmq.la
tests_test_server_drop_more_SOURCES = tests/test_server_drop_more.cpp
tests_test_server_drop_more_LDADD = src/libzmq.la
tests_test_client_drop_more_SOURCES = tests/test_client_drop_more.cpp
tests_test_client_drop_more_LDADD = src/libzmq.la
if !ON_MINGW
if !ON_CYGWIN
test_apps += \

View File

@ -217,6 +217,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg);
ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
ZMQ_EXPORT int zmq_msg_set_routing_id(zmq_msg_t *msg, uint32_t routing_id);
ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
/******************************************************************************/
@ -236,6 +238,8 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
#define ZMQ_STREAM 11
#define ZMQ_SERVER 12
#define ZMQ_CLIENT 13
/* Deprecated aliases */
#define ZMQ_XREQ ZMQ_DEALER

102
src/client.cpp Normal file
View File

@ -0,0 +1,102 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "client.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_)
{
options.type = ZMQ_CLIENT;
}
zmq::client_t::~client_t ()
{
}
void zmq::client_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void) subscribe_to_all_;
zmq_assert (pipe_);
fq.attach (pipe_);
lb.attach (pipe_);
}
int zmq::client_t::xsend (msg_t *msg_)
{
zmq_assert(!(msg_->flags () & msg_t::more));
return lb.sendpipe (msg_, NULL);
}
int zmq::client_t::xrecv (msg_t *msg_)
{
int rc = fq.recvpipe (msg_, NULL);
// Drop any messages with more flag
while (rc == 0 && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL);
// get the new message
if (rc == 0)
rc = fq.recvpipe (msg_, NULL);
}
return rc;
}
bool zmq::client_t::xhas_in ()
{
return fq.has_in ();
}
bool zmq::client_t::xhas_out ()
{
return lb.has_out ();
}
zmq::blob_t zmq::client_t::get_credential () const
{
return fq.get_credential ();
}
void zmq::client_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::client_t::xwrite_activated (pipe_t *pipe_)
{
lb.activated (pipe_);
}
void zmq::client_t::xpipe_terminated (pipe_t *pipe_)
{
fq.pipe_terminated (pipe_);
lb.pipe_terminated (pipe_);
}

71
src/client.hpp Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_CLIENT_HPP_INCLUDED__
#define __ZMQ_CLIENT_HPP_INCLUDED__
#include "socket_base.hpp"
#include "session_base.hpp"
#include "fq.hpp"
#include "lb.hpp"
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class socket_base_t;
class client_t :
public socket_base_t
{
public:
client_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~client_t ();
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
blob_t get_credential () const;
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
private:
// Messages are fair-queued from inbound pipes. And load-balanced to
// the outbound pipes.
fq_t fq;
lb_t lb;
client_t (const client_t &);
const client_t &operator = (const client_t&);
};
}
#endif

View File

@ -64,8 +64,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
{
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB", "STREAM"};
zmq_assert (socket_type >= 0 && socket_type <= 10);
"XPUB", "XSUB", "STREAM", "SERVER", "CLIENT"};
zmq_assert (socket_type >= 0 && socket_type <= 13);
return names [socket_type];
}
@ -177,6 +177,10 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
return type_ == "PUB" || type_ == "XPUB";
case ZMQ_PAIR:
return type_ == "PAIR";
case ZMQ_SERVER:
return type_ == "CLIENT";
case ZMQ_CLIENT:
return type_ == "CLIENT" || type_ == "SERVER";
default:
break;
}

View File

@ -46,6 +46,7 @@ int zmq::msg_t::init ()
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = 0;
u.vsm.routing_id = 0;
file_desc = -1;
return 0;
}
@ -58,11 +59,13 @@ int zmq::msg_t::init_size (size_t size_)
u.vsm.type = type_vsm;
u.vsm.flags = 0;
u.vsm.size = (unsigned char) size_;
u.vsm.routing_id = 0;
}
else {
u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.routing_id = 0;
u.lmsg.content =
(content_t*) malloc (sizeof (content_t) + size_);
if (unlikely (!u.lmsg.content)) {
@ -95,11 +98,13 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
u.cmsg.flags = 0;
u.cmsg.data = data_;
u.cmsg.size = size_;
u.cmsg.routing_id = 0;
}
else {
u.lmsg.metadata = NULL;
u.lmsg.type = type_lmsg;
u.lmsg.flags = 0;
u.lmsg.routing_id = 0;
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
if (!u.lmsg.content) {
errno = ENOMEM;
@ -121,6 +126,7 @@ int zmq::msg_t::init_delimiter ()
u.delimiter.metadata = NULL;
u.delimiter.type = type_delimiter;
u.delimiter.flags = 0;
u.delimiter.routing_id = 0;
return 0;
}
@ -377,3 +383,14 @@ bool zmq::msg_t::rm_refs (int refs_)
return true;
}
uint32_t zmq::msg_t::get_routing_id()
{
return u.base.routing_id;
}
int zmq::msg_t::set_routing_id(uint32_t routing_id_)
{
u.base.routing_id = routing_id_;
return 0;
}

View File

@ -79,6 +79,8 @@ namespace zmq
bool is_delimiter () const;
bool is_vsm ();
bool is_cmsg ();
uint32_t get_routing_id();
int set_routing_id(uint32_t routing_id_);
// After calling this function you can copy the message in POD-style
// refs_ times. No need to call copy.
@ -93,7 +95,7 @@ namespace zmq
// Size in bytes of the largest message that is still copied around
// rather than being reference-counted.
enum { msg_t_size = 64 };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3) };
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) };
// Shared message buffer. Message data are either allocated in one
// continuous block along with this structure - thus avoiding one
@ -136,9 +138,10 @@ namespace zmq
union {
struct {
metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} base;
struct {
metadata_t *metadata;
@ -146,28 +149,32 @@ namespace zmq
unsigned char size;
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} vsm;
struct {
metadata_t *metadata;
content_t *content;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)];
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} lmsg;
struct {
metadata_t *metadata;
void* data;
size_t size;
unsigned char unused
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)];
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} cmsg;
struct {
metadata_t *metadata;
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))];
unsigned char type;
unsigned char flags;
uint32_t routing_id;
} delimiter;
} u;
};

View File

@ -100,6 +100,16 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
sink = sink_;
}
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
{
routing_id = routing_id_;
}
uint32_t zmq::pipe_t::get_routing_id ()
{
return routing_id;
}
void zmq::pipe_t::set_identity (const blob_t &identity_)
{
identity = identity_;

View File

@ -74,6 +74,10 @@ namespace zmq
// Specifies the object to send events to.
void set_event_sink (i_pipe_events *sink_);
// Pipe endpoint can store an routing ID to be used by its clients.
void set_routing_id(uint32_t routing_id_);
uint32_t get_routing_id();
// Pipe endpoint can store an opaque ID to be used by its clients.
void set_identity (const blob_t &identity_);
blob_t get_identity ();
@ -204,6 +208,9 @@ namespace zmq
// Identity of the writer. Used uniquely by the reader side.
blob_t identity;
// Identity of the writer. Used uniquely by the reader side.
int routing_id;
// Pipe's credential.
blob_t credential;

163
src/server.cpp Normal file
View File

@ -0,0 +1,163 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "server.hpp"
#include "pipe.hpp"
#include "wire.hpp"
#include "random.hpp"
#include "likely.hpp"
#include "err.hpp"
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
next_rid (generate_random ())
{
options.type = ZMQ_SERVER;
}
zmq::server_t::~server_t ()
{
zmq_assert (outpipes.empty ());
}
void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
// subscribe_to_all_ is unused
(void)subscribe_to_all_;
zmq_assert (pipe_);
uint32_t routing_id = next_rid++;
pipe_->set_routing_id (routing_id);
// Add the record into output pipes lookup table
outpipe_t outpipe = {pipe_, true};
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
zmq_assert (ok);
fq.attach (pipe_);
}
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
{
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
zmq_assert (it != outpipes.end ());
outpipes.erase (it);
fq.pipe_terminated (pipe_);
}
void zmq::server_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::server_t::xwrite_activated (pipe_t *pipe_)
{
outpipes_t::iterator it;
for (it = outpipes.begin (); it != outpipes.end (); ++it)
if (it->second.pipe == pipe_)
break;
zmq_assert (it != outpipes.end ());
zmq_assert (!it->second.active);
it->second.active = true;
}
int zmq::server_t::xsend (msg_t *msg_)
{
zmq_assert(!(msg_->flags () & msg_t::more));
// Find the pipe associated with the routing stored in the message.
uint32_t routing_id = msg_->get_routing_id();
outpipes_t::iterator it = outpipes.find (routing_id);
if (it != outpipes.end ()) {
if (!it->second.pipe->check_write ()) {
it->second.active = false;
errno = EAGAIN;
return -1;
}
}
else {
errno = EHOSTUNREACH;
return -1;
}
bool ok = it->second.pipe->write (msg_);
if (unlikely (!ok)) {
// Message failed to send - we must close it ourselves.
int rc = msg_->close ();
errno_assert (rc == 0);
} else {
it->second.pipe->flush ();
}
// Detach the message from the data buffer.
int rc = msg_->init ();
errno_assert (rc == 0);
return 0;
}
int zmq::server_t::xrecv (msg_t *msg_)
{
pipe_t *pipe = NULL;
int rc = fq.recvpipe (msg_, &pipe);
// Drop any messages with more flag
while (rc == 0 && msg_->flags () & msg_t::more) {
// drop all frames of the current multi-frame message
rc = fq.recvpipe (msg_, NULL);
while (rc == 0 && msg_->flags () & msg_t::more)
rc = fq.recvpipe (msg_, NULL);
// get the new message
if (rc == 0)
rc = fq.recvpipe (msg_, &pipe);
}
if (rc != 0)
return rc;
zmq_assert (pipe != NULL);
uint32_t routing_id = pipe->get_routing_id();
msg_->set_routing_id(routing_id);
return 0;
}
bool zmq::server_t::xhas_in ()
{
return fq.has_in ();
}
bool zmq::server_t::xhas_out ()
{
// In theory, SERVER socket is always ready for writing. Whether actual
// attempt to write succeeds depends on whitch pipe the message is going
// to be routed to.
return true;
}
zmq::blob_t zmq::server_t::get_credential () const
{
return fq.get_credential ();
}

86
src/server.hpp Normal file
View File

@ -0,0 +1,86 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SERVER_HPP_INCLUDED__
#define __ZMQ_SERVER_HPP_INCLUDED__
#include <map>
#include "socket_base.hpp"
#include "session_base.hpp"
#include "stdint.hpp"
#include "blob.hpp"
#include "msg.hpp"
#include "fq.hpp"
namespace zmq
{
class ctx_t;
class pipe_t;
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
class server_t :
public socket_base_t
{
public:
server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
~server_t ();
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsend (zmq::msg_t *msg_);
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
bool xhas_out ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
protected:
blob_t get_credential () const;
private:
// Fair queueing object for inbound pipes.
fq_t fq;
struct outpipe_t
{
zmq::pipe_t *pipe;
bool active;
};
// Outbound pipes indexed by the peer IDs.
typedef std::map <uint32_t, outpipe_t> outpipes_t;
outpipes_t outpipes;
// Routing IDs are generated. It's a simple increment and wrap-over
// algorithm. This value is the next ID to use (if not used already).
uint32_t next_rid;
server_t (const server_t&);
const server_t &operator = (const server_t&);
};
}
#endif

View File

@ -55,6 +55,8 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_PULL:
case ZMQ_PAIR:
case ZMQ_STREAM:
case ZMQ_SERVER:
case ZMQ_CLIENT:
s = new (std::nothrow) session_base_t (io_thread_, active_,
socket_, options_, addr_);
break;
@ -297,7 +299,8 @@ int zmq::session_base_t::zap_connect ()
return -1;
}
if (peer.options.type != ZMQ_REP
&& peer.options.type != ZMQ_ROUTER) {
&& peer.options.type != ZMQ_ROUTER
&& peer.options.type != ZMQ_SERVER) {
errno = ECONNREFUSED;
return -1;
}

View File

@ -70,6 +70,8 @@
#include "xpub.hpp"
#include "xsub.hpp"
#include "stream.hpp"
#include "server.hpp"
#include "client.hpp"
bool zmq::socket_base_t::check_tag ()
{
@ -117,6 +119,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_STREAM:
s = new (std::nothrow) stream_t (parent_, tid_, sid_);
break;
case ZMQ_SERVER:
s = new (std::nothrow) server_t (parent_, tid_, sid_);
break;
case ZMQ_CLIENT:
s = new (std::nothrow) client_t (parent_, tid_, sid_);
break;
default:
errno = EINVAL;
return NULL;

View File

@ -645,6 +645,15 @@ int zmq_msg_set (zmq_msg_t *, int, int)
return -1;
}
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
{
return ((zmq::msg_t*) msg_)->set_routing_id(routing_id_);
}
uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg_)
{
return ((zmq::msg_t*) msg_)->get_routing_id();
}
// Get message metadata string

View File

@ -0,0 +1,106 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int send_msg(zmq_msg_t* msg, void* s, int flags, int value);
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *client = zmq_socket (ctx, ZMQ_CLIENT);
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
int rc;
rc = zmq_bind (client, "inproc://serverdropmore");
assert (rc == 0);
rc = zmq_connect (dealer, "inproc://serverdropmore");
assert (rc == 0);
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
// we will send 2 3-frames messages and then single frame message, only last one should be received
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 1);
assert(rc == 1);
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 2);
assert(rc == 1);
rc = send_msg (&msg, dealer, 0, 3);
assert(rc == 1);
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 4);
assert(rc == 1);
rc = send_msg (&msg, dealer, ZMQ_SNDMORE, 5);
assert(rc == 1);
rc = send_msg (&msg, dealer, 0, 6);
assert(rc == 1);
rc = send_msg (&msg, dealer, 0, 7);
assert(rc == 1);
rc = zmq_msg_recv (&msg, client, 0);
assert (rc == 1);
assert(zmq_msg_more(&msg) == 0);
unsigned char* data = (unsigned char*)zmq_msg_data (&msg);
assert (data[0] == 7);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
int send_msg(zmq_msg_t* msg, void* s, int flags, int value)
{
int rc = zmq_msg_close(msg);
if (rc != 0)
return rc;
zmq_msg_init_size(msg, 1);
if (rc != 0)
return rc;
unsigned char* data = (unsigned char*)zmq_msg_data(msg);
data[0] = (unsigned char)value;
return zmq_msg_send (msg, s, flags);
}

View File

@ -0,0 +1,85 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
void *client = zmq_socket (ctx, ZMQ_CLIENT);
int rc;
rc = zmq_bind (server, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:5560");
assert (rc == 0);
zmq_msg_t msg;
rc = zmq_msg_init_size(&msg,1);
assert (rc == 0);
char * data = (char *)zmq_msg_data(&msg);
data[0] = 1;
rc = zmq_msg_send(&msg, client, 0);
assert (rc == 1);
rc = zmq_msg_recv(&msg, server, 0);
assert (rc == 1);
uint32_t routing_id = zmq_msg_get_routing_id(&msg);
assert(routing_id != 0);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_msg_init_size (&msg, 1);
assert (rc == 0);
data = (char *)zmq_msg_data(&msg);
data[0] = 2;
rc = zmq_msg_set_routing_id(&msg, routing_id);
assert (rc == 0);
rc = zmq_msg_send(&msg, server, 0);
assert (rc == 1);
rc = zmq_msg_recv(&msg, client, 0);
assert (rc == 1);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}

View File

@ -0,0 +1,106 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int send_msg(zmq_msg_t* msg, void* s, int flags, int value);
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
void *client = zmq_socket (ctx, ZMQ_DEALER);
int rc;
rc = zmq_bind (server, "inproc://serverdropmore");
assert (rc == 0);
rc = zmq_connect (client, "inproc://serverdropmore");
assert (rc == 0);
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
// we will send 2 3-frames messages and then single frame message, only last one should be received
rc = send_msg (&msg, client, ZMQ_SNDMORE, 1);
assert(rc == 1);
rc = send_msg (&msg, client, ZMQ_SNDMORE, 2);
assert(rc == 1);
rc = send_msg (&msg, client, 0, 3);
assert(rc == 1);
rc = send_msg (&msg, client, ZMQ_SNDMORE, 4);
assert(rc == 1);
rc = send_msg (&msg, client, ZMQ_SNDMORE, 5);
assert(rc == 1);
rc = send_msg (&msg, client, 0, 6);
assert(rc == 1);
rc = send_msg (&msg, client, 0, 7);
assert(rc == 1);
rc = zmq_msg_recv (&msg, server, 0);
assert (rc == 1);
assert(zmq_msg_more(&msg) == 0);
unsigned char* data = (unsigned char*)zmq_msg_data (&msg);
assert (data[0] == 7);
rc = zmq_msg_close (&msg);
assert (rc == 0);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}
int send_msg(zmq_msg_t* msg, void* s, int flags, int value)
{
int rc = zmq_msg_close(msg);
if (rc != 0)
return rc;
zmq_msg_init_size(msg, 1);
if (rc != 0)
return rc;
unsigned char* data = (unsigned char*)zmq_msg_data(msg);
data[0] = (unsigned char)value;
return zmq_msg_send (msg, s, flags);
}