mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 23:36:04 +00:00
Set and arrange propagation of thread safe sockets flag.
We use a distinct context initialisation function to specify all sockets derived therefrom will be thread safe. However the inheritance is done exclusively in the C interface. This is not really correct, but it is chosen to minimise interference with the existing C++ code, including any construct or other calls within the C++ code base. Semantically the C++ code should be unchanged, physically some data structures and extra methods are provided by they're only used from the C binding.
This commit is contained in:
parent
988efbc73a
commit
520ad3c2d7
@ -147,6 +147,7 @@ ZMQ_EXPORT int zmq_getmsgopt (zmq_msg_t *msg, int option, void *optval,
|
|||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
|
|
||||||
ZMQ_EXPORT void *zmq_init (int io_threads);
|
ZMQ_EXPORT void *zmq_init (int io_threads);
|
||||||
|
ZMQ_EXPORT void *zmq_init_thread_safe (int io_threads);
|
||||||
ZMQ_EXPORT int zmq_term (void *context);
|
ZMQ_EXPORT int zmq_term (void *context);
|
||||||
|
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
|
10
src/ctx.cpp
10
src/ctx.cpp
@ -81,6 +81,16 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
|
|||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::ctx_t::set_thread_safe()
|
||||||
|
{
|
||||||
|
thread_safe_flag = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool zmq::ctx_t::get_thread_safe() const
|
||||||
|
{
|
||||||
|
return thread_safe_flag;
|
||||||
|
}
|
||||||
|
|
||||||
bool zmq::ctx_t::check_tag ()
|
bool zmq::ctx_t::check_tag ()
|
||||||
{
|
{
|
||||||
return tag == 0xbadcafe0;
|
return tag == 0xbadcafe0;
|
||||||
|
@ -99,6 +99,10 @@ namespace zmq
|
|||||||
reaper_tid = 1
|
reaper_tid = 1
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// create thread safe sockets
|
||||||
|
void set_thread_safe();
|
||||||
|
bool get_thread_safe() const;
|
||||||
|
|
||||||
~ctx_t ();
|
~ctx_t ();
|
||||||
private:
|
private:
|
||||||
|
|
||||||
@ -151,6 +155,8 @@ namespace zmq
|
|||||||
zmq::socket_base_t *log_socket;
|
zmq::socket_base_t *log_socket;
|
||||||
mutex_t log_sync;
|
mutex_t log_sync;
|
||||||
|
|
||||||
|
bool thread_safe_flag;
|
||||||
|
|
||||||
ctx_t (const ctx_t&);
|
ctx_t (const ctx_t&);
|
||||||
const ctx_t &operator = (const ctx_t&);
|
const ctx_t &operator = (const ctx_t&);
|
||||||
};
|
};
|
||||||
|
@ -874,6 +874,11 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
|
|||||||
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
rcvmore = msg_->flags () & msg_t::more ? true : false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::socket_base_t::set_thread_safe()
|
||||||
|
{
|
||||||
|
thread_safe_flag = true;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::lock()
|
void zmq::socket_base_t::lock()
|
||||||
{
|
{
|
||||||
sync.lock();
|
sync.lock();
|
||||||
|
@ -96,6 +96,7 @@ namespace zmq
|
|||||||
void hiccuped (pipe_t *pipe_);
|
void hiccuped (pipe_t *pipe_);
|
||||||
void terminated (pipe_t *pipe_);
|
void terminated (pipe_t *pipe_);
|
||||||
bool thread_safe() const { return thread_safe_flag; }
|
bool thread_safe() const { return thread_safe_flag; }
|
||||||
|
void set_thread_safe(); // should be in constructor, here for compat
|
||||||
void lock();
|
void lock();
|
||||||
void unlock();
|
void unlock();
|
||||||
protected:
|
protected:
|
||||||
|
21
src/zmq.cpp
21
src/zmq.cpp
@ -90,7 +90,7 @@ const char *zmq_strerror (int errnum_)
|
|||||||
return zmq::errno_to_string (errnum_);
|
return zmq::errno_to_string (errnum_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *zmq_init (int io_threads_)
|
static zmq::ctx_t *inner_init (int io_threads_)
|
||||||
{
|
{
|
||||||
if (io_threads_ < 0) {
|
if (io_threads_ < 0) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
@ -139,7 +139,19 @@ void *zmq_init (int io_threads_)
|
|||||||
// Create 0MQ context.
|
// Create 0MQ context.
|
||||||
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
|
zmq::ctx_t *ctx = new (std::nothrow) zmq::ctx_t ((uint32_t) io_threads_);
|
||||||
alloc_assert (ctx);
|
alloc_assert (ctx);
|
||||||
return (void*) ctx;
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *zmq_init (int io_threads_)
|
||||||
|
{
|
||||||
|
return (void*) inner_init (io_threads_);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *zmq_init_thread_safe (int io_threads_)
|
||||||
|
{
|
||||||
|
zmq::ctx_t *ctx = inner_init (io_threads_);
|
||||||
|
ctx->set_thread_safe();
|
||||||
|
return (void*) ctx;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq_term (void *ctx_)
|
int zmq_term (void *ctx_)
|
||||||
@ -174,7 +186,10 @@ void *zmq_socket (void *ctx_, int type_)
|
|||||||
errno = EFAULT;
|
errno = EFAULT;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return (void*) (((zmq::ctx_t*) ctx_)->create_socket (type_));
|
zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_;
|
||||||
|
zmq::socket_base_t *s = ctx->create_socket (type_);
|
||||||
|
if (ctx->get_thread_safe ()) s->set_thread_safe ();
|
||||||
|
return (void*) s;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq_close (void *s_)
|
int zmq_close (void *s_)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user