mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 16:06:09 +00:00
Merge pull request #1334 from somdoron/ServerSocket
Problem: sockets are not thread safe
This commit is contained in:
commit
a6362a454f
@ -144,6 +144,8 @@ src_libzmq_la_SOURCES = \
|
|||||||
src/router.hpp \
|
src/router.hpp \
|
||||||
src/select.cpp \
|
src/select.cpp \
|
||||||
src/select.hpp \
|
src/select.hpp \
|
||||||
|
src/server.cpp \
|
||||||
|
src/server.hpp \
|
||||||
src/session_base.cpp \
|
src/session_base.cpp \
|
||||||
src/session_base.hpp \
|
src/session_base.hpp \
|
||||||
src/signaler.cpp \
|
src/signaler.cpp \
|
||||||
@ -338,7 +340,8 @@ test_apps = \
|
|||||||
tests/test_xpub_nodrop \
|
tests/test_xpub_nodrop \
|
||||||
tests/test_xpub_manual \
|
tests/test_xpub_manual \
|
||||||
tests/test_xpub_welcome_msg \
|
tests/test_xpub_welcome_msg \
|
||||||
tests/test_atomics
|
tests/test_atomics \
|
||||||
|
tests/test_server
|
||||||
|
|
||||||
tests_test_system_SOURCES = tests/test_system.cpp
|
tests_test_system_SOURCES = tests/test_system.cpp
|
||||||
tests_test_system_LDADD = src/libzmq.la
|
tests_test_system_LDADD = src/libzmq.la
|
||||||
@ -510,6 +513,9 @@ tests_test_xpub_welcome_msg_LDADD = src/libzmq.la
|
|||||||
tests_test_atomics_SOURCES = tests/test_atomics.cpp
|
tests_test_atomics_SOURCES = tests/test_atomics.cpp
|
||||||
tests_test_atomics_LDADD = src/libzmq.la
|
tests_test_atomics_LDADD = src/libzmq.la
|
||||||
|
|
||||||
|
tests_test_server_SOURCES = tests/test_server.cpp
|
||||||
|
tests_test_server_LDADD = src/libzmq.la
|
||||||
|
|
||||||
if !ON_MINGW
|
if !ON_MINGW
|
||||||
if !ON_CYGWIN
|
if !ON_CYGWIN
|
||||||
test_apps += \
|
test_apps += \
|
||||||
|
@ -217,6 +217,8 @@ ZMQ_EXPORT int zmq_msg_more (zmq_msg_t *msg);
|
|||||||
ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
|
ZMQ_EXPORT int zmq_msg_get (zmq_msg_t *msg, int property);
|
||||||
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
|
ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int property, int optval);
|
||||||
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
||||||
|
ZMQ_EXPORT int zmq_msg_set_routing_id(zmq_msg_t *msg, uint32_t routing_id);
|
||||||
|
ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
|
||||||
|
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
@ -236,6 +238,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
|||||||
#define ZMQ_XPUB 9
|
#define ZMQ_XPUB 9
|
||||||
#define ZMQ_XSUB 10
|
#define ZMQ_XSUB 10
|
||||||
#define ZMQ_STREAM 11
|
#define ZMQ_STREAM 11
|
||||||
|
#define ZMQ_SERVER 12
|
||||||
|
|
||||||
/* Deprecated aliases */
|
/* Deprecated aliases */
|
||||||
#define ZMQ_XREQ ZMQ_DEALER
|
#define ZMQ_XREQ ZMQ_DEALER
|
||||||
|
@ -64,8 +64,8 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
|
|||||||
{
|
{
|
||||||
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
|
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
|
||||||
"DEALER", "ROUTER", "PULL", "PUSH",
|
"DEALER", "ROUTER", "PULL", "PUSH",
|
||||||
"XPUB", "XSUB", "STREAM"};
|
"XPUB", "XSUB", "STREAM", "SERVER"};
|
||||||
zmq_assert (socket_type >= 0 && socket_type <= 10);
|
zmq_assert (socket_type >= 0 && socket_type <= 12);
|
||||||
return names [socket_type];
|
return names [socket_type];
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -160,7 +160,7 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
|
|||||||
case ZMQ_REP:
|
case ZMQ_REP:
|
||||||
return type_ == "REQ" || type_ == "DEALER";
|
return type_ == "REQ" || type_ == "DEALER";
|
||||||
case ZMQ_DEALER:
|
case ZMQ_DEALER:
|
||||||
return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER";
|
return type_ == "REP" || type_ == "DEALER" || type_ == "ROUTER" || type_ == "SERVER";
|
||||||
case ZMQ_ROUTER:
|
case ZMQ_ROUTER:
|
||||||
return type_ == "REQ" || type_ == "DEALER" || type_ == "ROUTER";
|
return type_ == "REQ" || type_ == "DEALER" || type_ == "ROUTER";
|
||||||
case ZMQ_PUSH:
|
case ZMQ_PUSH:
|
||||||
@ -177,6 +177,8 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
|
|||||||
return type_ == "PUB" || type_ == "XPUB";
|
return type_ == "PUB" || type_ == "XPUB";
|
||||||
case ZMQ_PAIR:
|
case ZMQ_PAIR:
|
||||||
return type_ == "PAIR";
|
return type_ == "PAIR";
|
||||||
|
case ZMQ_SERVER:
|
||||||
|
return type_ == "DEALER";
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
17
src/msg.cpp
17
src/msg.cpp
@ -46,6 +46,7 @@ int zmq::msg_t::init ()
|
|||||||
u.vsm.type = type_vsm;
|
u.vsm.type = type_vsm;
|
||||||
u.vsm.flags = 0;
|
u.vsm.flags = 0;
|
||||||
u.vsm.size = 0;
|
u.vsm.size = 0;
|
||||||
|
u.vsm.routing_id = 0;
|
||||||
file_desc = -1;
|
file_desc = -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -58,11 +59,13 @@ int zmq::msg_t::init_size (size_t size_)
|
|||||||
u.vsm.type = type_vsm;
|
u.vsm.type = type_vsm;
|
||||||
u.vsm.flags = 0;
|
u.vsm.flags = 0;
|
||||||
u.vsm.size = (unsigned char) size_;
|
u.vsm.size = (unsigned char) size_;
|
||||||
|
u.vsm.routing_id = 0;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
u.lmsg.metadata = NULL;
|
u.lmsg.metadata = NULL;
|
||||||
u.lmsg.type = type_lmsg;
|
u.lmsg.type = type_lmsg;
|
||||||
u.lmsg.flags = 0;
|
u.lmsg.flags = 0;
|
||||||
|
u.lmsg.routing_id = 0;
|
||||||
u.lmsg.content =
|
u.lmsg.content =
|
||||||
(content_t*) malloc (sizeof (content_t) + size_);
|
(content_t*) malloc (sizeof (content_t) + size_);
|
||||||
if (unlikely (!u.lmsg.content)) {
|
if (unlikely (!u.lmsg.content)) {
|
||||||
@ -95,11 +98,13 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_,
|
|||||||
u.cmsg.flags = 0;
|
u.cmsg.flags = 0;
|
||||||
u.cmsg.data = data_;
|
u.cmsg.data = data_;
|
||||||
u.cmsg.size = size_;
|
u.cmsg.size = size_;
|
||||||
|
u.cmsg.routing_id = 0;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
u.lmsg.metadata = NULL;
|
u.lmsg.metadata = NULL;
|
||||||
u.lmsg.type = type_lmsg;
|
u.lmsg.type = type_lmsg;
|
||||||
u.lmsg.flags = 0;
|
u.lmsg.flags = 0;
|
||||||
|
u.lmsg.routing_id = 0;
|
||||||
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
|
u.lmsg.content = (content_t*) malloc (sizeof (content_t));
|
||||||
if (!u.lmsg.content) {
|
if (!u.lmsg.content) {
|
||||||
errno = ENOMEM;
|
errno = ENOMEM;
|
||||||
@ -121,6 +126,7 @@ int zmq::msg_t::init_delimiter ()
|
|||||||
u.delimiter.metadata = NULL;
|
u.delimiter.metadata = NULL;
|
||||||
u.delimiter.type = type_delimiter;
|
u.delimiter.type = type_delimiter;
|
||||||
u.delimiter.flags = 0;
|
u.delimiter.flags = 0;
|
||||||
|
u.delimiter.routing_id = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -377,3 +383,14 @@ bool zmq::msg_t::rm_refs (int refs_)
|
|||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t zmq::msg_t::get_routing_id()
|
||||||
|
{
|
||||||
|
return u.base.routing_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::msg_t::set_routing_id(uint32_t routing_id_)
|
||||||
|
{
|
||||||
|
u.base.routing_id = routing_id_;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
17
src/msg.hpp
17
src/msg.hpp
@ -79,6 +79,8 @@ namespace zmq
|
|||||||
bool is_delimiter () const;
|
bool is_delimiter () const;
|
||||||
bool is_vsm ();
|
bool is_vsm ();
|
||||||
bool is_cmsg ();
|
bool is_cmsg ();
|
||||||
|
uint32_t get_routing_id();
|
||||||
|
int set_routing_id(uint32_t routing_id_);
|
||||||
|
|
||||||
// After calling this function you can copy the message in POD-style
|
// After calling this function you can copy the message in POD-style
|
||||||
// refs_ times. No need to call copy.
|
// refs_ times. No need to call copy.
|
||||||
@ -93,7 +95,7 @@ namespace zmq
|
|||||||
// Size in bytes of the largest message that is still copied around
|
// Size in bytes of the largest message that is still copied around
|
||||||
// rather than being reference-counted.
|
// rather than being reference-counted.
|
||||||
enum { msg_t_size = 64 };
|
enum { msg_t_size = 64 };
|
||||||
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3) };
|
enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) };
|
||||||
|
|
||||||
// Shared message buffer. Message data are either allocated in one
|
// Shared message buffer. Message data are either allocated in one
|
||||||
// continuous block along with this structure - thus avoiding one
|
// continuous block along with this structure - thus avoiding one
|
||||||
@ -136,9 +138,10 @@ namespace zmq
|
|||||||
union {
|
union {
|
||||||
struct {
|
struct {
|
||||||
metadata_t *metadata;
|
metadata_t *metadata;
|
||||||
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
|
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))];
|
||||||
unsigned char type;
|
unsigned char type;
|
||||||
unsigned char flags;
|
unsigned char flags;
|
||||||
|
uint32_t routing_id;
|
||||||
} base;
|
} base;
|
||||||
struct {
|
struct {
|
||||||
metadata_t *metadata;
|
metadata_t *metadata;
|
||||||
@ -146,28 +149,32 @@ namespace zmq
|
|||||||
unsigned char size;
|
unsigned char size;
|
||||||
unsigned char type;
|
unsigned char type;
|
||||||
unsigned char flags;
|
unsigned char flags;
|
||||||
|
uint32_t routing_id;
|
||||||
} vsm;
|
} vsm;
|
||||||
struct {
|
struct {
|
||||||
metadata_t *metadata;
|
metadata_t *metadata;
|
||||||
content_t *content;
|
content_t *content;
|
||||||
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2)];
|
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2 + sizeof(uint32_t))];
|
||||||
unsigned char type;
|
unsigned char type;
|
||||||
unsigned char flags;
|
unsigned char flags;
|
||||||
|
uint32_t routing_id;
|
||||||
} lmsg;
|
} lmsg;
|
||||||
struct {
|
struct {
|
||||||
metadata_t *metadata;
|
metadata_t *metadata;
|
||||||
void* data;
|
void* data;
|
||||||
size_t size;
|
size_t size;
|
||||||
unsigned char unused
|
unsigned char unused
|
||||||
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2)];
|
[msg_t_size - (8 + sizeof (metadata_t *) + sizeof (void*) + sizeof (size_t) + 2 + sizeof(uint32_t))];
|
||||||
unsigned char type;
|
unsigned char type;
|
||||||
unsigned char flags;
|
unsigned char flags;
|
||||||
|
uint32_t routing_id;
|
||||||
} cmsg;
|
} cmsg;
|
||||||
struct {
|
struct {
|
||||||
metadata_t *metadata;
|
metadata_t *metadata;
|
||||||
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2)];
|
unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + 2 + sizeof(uint32_t))];
|
||||||
unsigned char type;
|
unsigned char type;
|
||||||
unsigned char flags;
|
unsigned char flags;
|
||||||
|
uint32_t routing_id;
|
||||||
} delimiter;
|
} delimiter;
|
||||||
} u;
|
} u;
|
||||||
};
|
};
|
||||||
|
10
src/pipe.cpp
10
src/pipe.cpp
@ -100,6 +100,16 @@ void zmq::pipe_t::set_event_sink (i_pipe_events *sink_)
|
|||||||
sink = sink_;
|
sink = sink_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::pipe_t::set_routing_id (uint32_t routing_id_)
|
||||||
|
{
|
||||||
|
routing_id = routing_id_;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t zmq::pipe_t::get_routing_id ()
|
||||||
|
{
|
||||||
|
return routing_id;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::pipe_t::set_identity (const blob_t &identity_)
|
void zmq::pipe_t::set_identity (const blob_t &identity_)
|
||||||
{
|
{
|
||||||
identity = identity_;
|
identity = identity_;
|
||||||
|
@ -74,6 +74,10 @@ namespace zmq
|
|||||||
// Specifies the object to send events to.
|
// Specifies the object to send events to.
|
||||||
void set_event_sink (i_pipe_events *sink_);
|
void set_event_sink (i_pipe_events *sink_);
|
||||||
|
|
||||||
|
// Pipe endpoint can store an routing ID to be used by its clients.
|
||||||
|
void set_routing_id(uint32_t routing_id_);
|
||||||
|
uint32_t get_routing_id();
|
||||||
|
|
||||||
// Pipe endpoint can store an opaque ID to be used by its clients.
|
// Pipe endpoint can store an opaque ID to be used by its clients.
|
||||||
void set_identity (const blob_t &identity_);
|
void set_identity (const blob_t &identity_);
|
||||||
blob_t get_identity ();
|
blob_t get_identity ();
|
||||||
@ -204,6 +208,9 @@ namespace zmq
|
|||||||
// Identity of the writer. Used uniquely by the reader side.
|
// Identity of the writer. Used uniquely by the reader side.
|
||||||
blob_t identity;
|
blob_t identity;
|
||||||
|
|
||||||
|
// Identity of the writer. Used uniquely by the reader side.
|
||||||
|
int routing_id;
|
||||||
|
|
||||||
// Pipe's credential.
|
// Pipe's credential.
|
||||||
blob_t credential;
|
blob_t credential;
|
||||||
|
|
||||||
|
169
src/server.cpp
Normal file
169
src/server.cpp
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of 0MQ.
|
||||||
|
|
||||||
|
0MQ is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License as published by
|
||||||
|
the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
0MQ is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU Lesser General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "server.hpp"
|
||||||
|
#include "pipe.hpp"
|
||||||
|
#include "wire.hpp"
|
||||||
|
#include "random.hpp"
|
||||||
|
#include "likely.hpp"
|
||||||
|
#include "err.hpp"
|
||||||
|
|
||||||
|
zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||||
|
socket_base_t (parent_, tid_, sid_),
|
||||||
|
next_rid (generate_random ())
|
||||||
|
{
|
||||||
|
options.type = ZMQ_SERVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::server_t::~server_t ()
|
||||||
|
{
|
||||||
|
zmq_assert (outpipes.empty ());
|
||||||
|
}
|
||||||
|
|
||||||
|
void zmq::server_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
|
||||||
|
{
|
||||||
|
// subscribe_to_all_ is unused
|
||||||
|
(void)subscribe_to_all_;
|
||||||
|
|
||||||
|
zmq_assert (pipe_);
|
||||||
|
|
||||||
|
uint32_t routing_id = next_rid++;
|
||||||
|
pipe_->set_routing_id (routing_id);
|
||||||
|
// Add the record into output pipes lookup table
|
||||||
|
outpipe_t outpipe = {pipe_, true};
|
||||||
|
bool ok = outpipes.insert (outpipes_t::value_type (routing_id, outpipe)).second;
|
||||||
|
zmq_assert (ok);
|
||||||
|
|
||||||
|
fq.attach (pipe_);
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::server_t::xsetsockopt (int option_, const void *optval_,
|
||||||
|
size_t optvallen_)
|
||||||
|
{
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void zmq::server_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
|
{
|
||||||
|
outpipes_t::iterator it = outpipes.find (pipe_->get_routing_id ());
|
||||||
|
zmq_assert (it != outpipes.end ());
|
||||||
|
outpipes.erase (it);
|
||||||
|
fq.pipe_terminated (pipe_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void zmq::server_t::xread_activated (pipe_t *pipe_)
|
||||||
|
{
|
||||||
|
fq.activated (pipe_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void zmq::server_t::xwrite_activated (pipe_t *pipe_)
|
||||||
|
{
|
||||||
|
outpipes_t::iterator it;
|
||||||
|
for (it = outpipes.begin (); it != outpipes.end (); ++it)
|
||||||
|
if (it->second.pipe == pipe_)
|
||||||
|
break;
|
||||||
|
|
||||||
|
zmq_assert (it != outpipes.end ());
|
||||||
|
zmq_assert (!it->second.active);
|
||||||
|
it->second.active = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::server_t::xsend (msg_t *msg_)
|
||||||
|
{
|
||||||
|
// Server doesn't support multipart
|
||||||
|
if (msg_->flags () & msg_t::more) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the pipe associated with the routing stored in the message.
|
||||||
|
uint32_t routing_id = msg_->get_routing_id();
|
||||||
|
outpipes_t::iterator it = outpipes.find (routing_id);
|
||||||
|
|
||||||
|
if (it != outpipes.end ()) {
|
||||||
|
if (!it->second.pipe->check_write ()) {
|
||||||
|
it->second.active = false;
|
||||||
|
errno = EAGAIN;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
errno = EHOSTUNREACH;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ok = it->second.pipe->write (msg_);
|
||||||
|
if (unlikely (!ok)) {
|
||||||
|
// Message failed to send - we must close it ourselves.
|
||||||
|
int rc = msg_->close ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
} else {
|
||||||
|
it->second.pipe->flush ();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Detach the message from the data buffer.
|
||||||
|
int rc = msg_->init ();
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq::server_t::xrecv (msg_t *msg_)
|
||||||
|
{
|
||||||
|
pipe_t *pipe = NULL;
|
||||||
|
int rc = fq.recvpipe (msg_, &pipe);
|
||||||
|
|
||||||
|
if (rc != 0)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
zmq_assert (pipe != NULL);
|
||||||
|
|
||||||
|
if (msg_->flags () & msg_t::more) {
|
||||||
|
msg_->close();
|
||||||
|
msg_->init();
|
||||||
|
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t routing_id = pipe->get_routing_id();
|
||||||
|
msg_->set_routing_id(routing_id);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool zmq::server_t::xhas_in ()
|
||||||
|
{
|
||||||
|
return fq.has_in ();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool zmq::server_t::xhas_out ()
|
||||||
|
{
|
||||||
|
// In theory, SERVER socket is always ready for writing. Whether actual
|
||||||
|
// attempt to write succeeds depends on whitch pipe the message is going
|
||||||
|
// to be routed to.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::blob_t zmq::server_t::get_credential () const
|
||||||
|
{
|
||||||
|
return fq.get_credential ();
|
||||||
|
}
|
87
src/server.hpp
Normal file
87
src/server.hpp
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of 0MQ.
|
||||||
|
|
||||||
|
0MQ is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License as published by
|
||||||
|
the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
0MQ is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU Lesser General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef __ZMQ_SERVER_HPP_INCLUDED__
|
||||||
|
#define __ZMQ_SERVER_HPP_INCLUDED__
|
||||||
|
|
||||||
|
#include <map>
|
||||||
|
|
||||||
|
#include "socket_base.hpp"
|
||||||
|
#include "session_base.hpp"
|
||||||
|
#include "stdint.hpp"
|
||||||
|
#include "blob.hpp"
|
||||||
|
#include "msg.hpp"
|
||||||
|
#include "fq.hpp"
|
||||||
|
|
||||||
|
namespace zmq
|
||||||
|
{
|
||||||
|
|
||||||
|
class ctx_t;
|
||||||
|
class pipe_t;
|
||||||
|
|
||||||
|
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
|
||||||
|
class server_t :
|
||||||
|
public socket_base_t
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
|
||||||
|
server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid);
|
||||||
|
~server_t ();
|
||||||
|
|
||||||
|
// Overrides of functions from socket_base_t.
|
||||||
|
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
|
||||||
|
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
|
||||||
|
int xsend (zmq::msg_t *msg_);
|
||||||
|
int xrecv (zmq::msg_t *msg_);
|
||||||
|
bool xhas_in ();
|
||||||
|
bool xhas_out ();
|
||||||
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
|
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
|
||||||
|
blob_t get_credential () const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
// Fair queueing object for inbound pipes.
|
||||||
|
fq_t fq;
|
||||||
|
|
||||||
|
struct outpipe_t
|
||||||
|
{
|
||||||
|
zmq::pipe_t *pipe;
|
||||||
|
bool active;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Outbound pipes indexed by the peer IDs.
|
||||||
|
typedef std::map <uint32_t, outpipe_t> outpipes_t;
|
||||||
|
outpipes_t outpipes;
|
||||||
|
|
||||||
|
// Routing IDs are generated. It's a simple increment and wrap-over
|
||||||
|
// algorithm. This value is the next ID to use (if not used already).
|
||||||
|
uint32_t next_rid;
|
||||||
|
|
||||||
|
server_t (const server_t&);
|
||||||
|
const server_t &operator = (const server_t&);
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
@ -55,6 +55,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
|||||||
case ZMQ_PULL:
|
case ZMQ_PULL:
|
||||||
case ZMQ_PAIR:
|
case ZMQ_PAIR:
|
||||||
case ZMQ_STREAM:
|
case ZMQ_STREAM:
|
||||||
|
case ZMQ_SERVER:
|
||||||
s = new (std::nothrow) session_base_t (io_thread_, active_,
|
s = new (std::nothrow) session_base_t (io_thread_, active_,
|
||||||
socket_, options_, addr_);
|
socket_, options_, addr_);
|
||||||
break;
|
break;
|
||||||
@ -297,7 +298,8 @@ int zmq::session_base_t::zap_connect ()
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (peer.options.type != ZMQ_REP
|
if (peer.options.type != ZMQ_REP
|
||||||
&& peer.options.type != ZMQ_ROUTER) {
|
&& peer.options.type != ZMQ_ROUTER
|
||||||
|
&& peer.options.type != ZMQ_SERVER) {
|
||||||
errno = ECONNREFUSED;
|
errno = ECONNREFUSED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -70,6 +70,7 @@
|
|||||||
#include "xpub.hpp"
|
#include "xpub.hpp"
|
||||||
#include "xsub.hpp"
|
#include "xsub.hpp"
|
||||||
#include "stream.hpp"
|
#include "stream.hpp"
|
||||||
|
#include "server.hpp"
|
||||||
|
|
||||||
bool zmq::socket_base_t::check_tag ()
|
bool zmq::socket_base_t::check_tag ()
|
||||||
{
|
{
|
||||||
@ -117,6 +118,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
|
|||||||
case ZMQ_STREAM:
|
case ZMQ_STREAM:
|
||||||
s = new (std::nothrow) stream_t (parent_, tid_, sid_);
|
s = new (std::nothrow) stream_t (parent_, tid_, sid_);
|
||||||
break;
|
break;
|
||||||
|
case ZMQ_SERVER:
|
||||||
|
s = new (std::nothrow) server_t (parent_, tid_, sid_);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -645,6 +645,15 @@ int zmq_msg_set (zmq_msg_t *, int, int)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int zmq_msg_set_routing_id (zmq_msg_t *msg_, uint32_t routing_id_)
|
||||||
|
{
|
||||||
|
return ((zmq::msg_t*) msg_)->set_routing_id(routing_id_);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg_)
|
||||||
|
{
|
||||||
|
return ((zmq::msg_t*) msg_)->get_routing_id();
|
||||||
|
}
|
||||||
|
|
||||||
// Get message metadata string
|
// Get message metadata string
|
||||||
|
|
||||||
|
87
tests/test_server.cpp
Normal file
87
tests/test_server.cpp
Normal file
@ -0,0 +1,87 @@
|
|||||||
|
/*
|
||||||
|
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
|
||||||
|
|
||||||
|
This file is part of 0MQ.
|
||||||
|
|
||||||
|
0MQ is free software; you can redistribute it and/or modify it under
|
||||||
|
the terms of the GNU Lesser General Public License as published by
|
||||||
|
the Free Software Foundation; either version 3 of the License, or
|
||||||
|
(at your option) any later version.
|
||||||
|
|
||||||
|
0MQ is distributed in the hope that it will be useful,
|
||||||
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
GNU Lesser General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU Lesser General Public License
|
||||||
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "testutil.hpp"
|
||||||
|
|
||||||
|
int main (void)
|
||||||
|
{
|
||||||
|
printf("0000");
|
||||||
|
|
||||||
|
setup_test_environment();
|
||||||
|
void *ctx = zmq_ctx_new ();
|
||||||
|
assert (ctx);
|
||||||
|
|
||||||
|
void *server = zmq_socket (ctx, ZMQ_SERVER);
|
||||||
|
void *client = zmq_socket (ctx, ZMQ_DEALER);
|
||||||
|
|
||||||
|
int rc;
|
||||||
|
|
||||||
|
rc = zmq_bind (server, "tcp://127.0.0.1:5560");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_connect (client, "tcp://127.0.0.1:5560");
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
zmq_msg_t msg;
|
||||||
|
rc = zmq_msg_init_size(&msg,1);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
char * data = (char *)zmq_msg_data(&msg);
|
||||||
|
data[0] = 1;
|
||||||
|
|
||||||
|
rc = zmq_msg_send(&msg, client, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
rc = zmq_msg_recv(&msg, server, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
uint32_t routing_id = zmq_msg_get_routing_id(&msg);
|
||||||
|
assert(routing_id != 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_close(&msg);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_init_size (&msg, 1);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
data = (char *)zmq_msg_data(&msg);
|
||||||
|
data[0] = 2;
|
||||||
|
|
||||||
|
rc = zmq_msg_set_routing_id(&msg, routing_id);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_msg_send(&msg, server, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
rc = zmq_msg_recv(&msg, client, 0);
|
||||||
|
assert (rc == 1);
|
||||||
|
|
||||||
|
rc = zmq_close (server);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_close (client);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
rc = zmq_ctx_term (ctx);
|
||||||
|
assert (rc == 0);
|
||||||
|
|
||||||
|
return 0 ;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
x
Reference in New Issue
Block a user