mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-02 03:08:04 +08:00
Terminate context in a child process of fork() to replace file descriptors to not interfere with parent process's context
This commit is contained in:
parent
0478ee04f4
commit
ff2900fd52
16
src/ctx.cpp
16
src/ctx.cpp
@ -49,6 +49,9 @@ zmq::ctx_t::ctx_t () :
|
|||||||
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
||||||
ipv6 (false)
|
ipv6 (false)
|
||||||
{
|
{
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
pid = getpid();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::ctx_t::check_tag ()
|
bool zmq::ctx_t::check_tag ()
|
||||||
@ -89,6 +92,19 @@ int zmq::ctx_t::terminate ()
|
|||||||
slot_sync.lock ();
|
slot_sync.lock ();
|
||||||
if (!starting) {
|
if (!starting) {
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
if (pid != getpid())
|
||||||
|
{
|
||||||
|
// we are a forked child process. Close all file descriptors
|
||||||
|
// inherited from the parent.
|
||||||
|
for (sockets_t::size_type i = 0; i != sockets.size (); i++)
|
||||||
|
{
|
||||||
|
sockets[i]->get_mailbox()->forked();
|
||||||
|
}
|
||||||
|
|
||||||
|
term_mailbox.forked();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
// Check whether termination was already underway, but interrupted and now
|
// Check whether termination was already underway, but interrupted and now
|
||||||
// restarted.
|
// restarted.
|
||||||
bool restarted = terminating;
|
bool restarted = terminating;
|
||||||
|
@ -167,6 +167,11 @@ namespace zmq
|
|||||||
|
|
||||||
ctx_t (const ctx_t&);
|
ctx_t (const ctx_t&);
|
||||||
const ctx_t &operator = (const ctx_t&);
|
const ctx_t &operator = (const ctx_t&);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
// the process that created this context. Used to detect forking.
|
||||||
|
pid_t pid;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -48,6 +48,9 @@ zmq::kqueue_t::kqueue_t () :
|
|||||||
// Create event queue
|
// Create event queue
|
||||||
kqueue_fd = kqueue ();
|
kqueue_fd = kqueue ();
|
||||||
errno_assert (kqueue_fd != -1);
|
errno_assert (kqueue_fd != -1);
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
pid = getpid();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::kqueue_t::~kqueue_t ()
|
zmq::kqueue_t::~kqueue_t ()
|
||||||
@ -161,6 +164,13 @@ void zmq::kqueue_t::loop ()
|
|||||||
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
|
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
|
||||||
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
|
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
|
||||||
timeout ? &ts: NULL);
|
timeout ? &ts: NULL);
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
if (unlikely(pid != getpid())) {
|
||||||
|
//printf("zmq::kqueue_t::loop aborting on forked child %d\n", (int)getpid());
|
||||||
|
// simply exit the loop in a forked process.
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
if (n == -1) {
|
if (n == -1) {
|
||||||
errno_assert (errno == EINTR);
|
errno_assert (errno == EINTR);
|
||||||
continue;
|
continue;
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#if defined ZMQ_USE_KQUEUE
|
#if defined ZMQ_USE_KQUEUE
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "thread.hpp"
|
#include "thread.hpp"
|
||||||
@ -94,6 +95,11 @@ namespace zmq
|
|||||||
|
|
||||||
kqueue_t (const kqueue_t&);
|
kqueue_t (const kqueue_t&);
|
||||||
const kqueue_t &operator = (const kqueue_t&);
|
const kqueue_t &operator = (const kqueue_t&);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
// the process that created this context. Used to detect forking.
|
||||||
|
pid_t pid;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef kqueue_t poller_t;
|
typedef kqueue_t poller_t;
|
||||||
|
@ -82,4 +82,3 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_)
|
|||||||
zmq_assert (ok);
|
zmq_assert (ok);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,6 +44,13 @@ namespace zmq
|
|||||||
void send (const command_t &cmd_);
|
void send (const command_t &cmd_);
|
||||||
int recv (command_t *cmd_, int timeout_);
|
int recv (command_t *cmd_, int timeout_);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
// close the file descriptors in the signaller. This is used in a forked
|
||||||
|
// child process to close the file descriptors so that they do not interfere
|
||||||
|
// with the context in the parent process.
|
||||||
|
void forked() { signaler.forked(); }
|
||||||
|
#endif
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// The pipe to store actual commands.
|
// The pipe to store actual commands.
|
||||||
|
@ -31,6 +31,10 @@ zmq::reaper_t::reaper_t (class ctx_t *ctx_, uint32_t tid_) :
|
|||||||
|
|
||||||
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
|
mailbox_handle = poller->add_fd (mailbox.get_fd (), this);
|
||||||
poller->set_pollin (mailbox_handle);
|
poller->set_pollin (mailbox_handle);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
pid = getpid();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::reaper_t::~reaper_t ()
|
zmq::reaper_t::~reaper_t ()
|
||||||
@ -57,6 +61,13 @@ void zmq::reaper_t::stop ()
|
|||||||
void zmq::reaper_t::in_event ()
|
void zmq::reaper_t::in_event ()
|
||||||
{
|
{
|
||||||
while (true) {
|
while (true) {
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
if (unlikely(pid != getpid()))
|
||||||
|
{
|
||||||
|
//printf("zmq::reaper_t::in_event return in child process %d\n", (int)getpid());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
// Get the next command. If there is none, exit.
|
// Get the next command. If there is none, exit.
|
||||||
command_t cmd;
|
command_t cmd;
|
||||||
|
@ -72,6 +72,11 @@ namespace zmq
|
|||||||
|
|
||||||
reaper_t (const reaper_t&);
|
reaper_t (const reaper_t&);
|
||||||
const reaper_t &operator = (const reaper_t&);
|
const reaper_t &operator = (const reaper_t&);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
// the process that created this context. Used to detect forking.
|
||||||
|
pid_t pid;
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -86,6 +86,10 @@ zmq::signaler_t::signaler_t ()
|
|||||||
// Set both fds to non-blocking mode.
|
// Set both fds to non-blocking mode.
|
||||||
unblock_socket (w);
|
unblock_socket (w);
|
||||||
unblock_socket (r);
|
unblock_socket (r);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
pid = getpid();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::signaler_t::~signaler_t ()
|
zmq::signaler_t::~signaler_t ()
|
||||||
@ -117,6 +121,12 @@ zmq::fd_t zmq::signaler_t::get_fd ()
|
|||||||
|
|
||||||
void zmq::signaler_t::send ()
|
void zmq::signaler_t::send ()
|
||||||
{
|
{
|
||||||
|
#if HAVE_FORK
|
||||||
|
if (unlikely(pid != getpid())) {
|
||||||
|
//printf("Child process %d signaler_t::send returning without sending #1\n", getpid());
|
||||||
|
return; // do not send anything in forked child context
|
||||||
|
}
|
||||||
|
#endif
|
||||||
#if defined ZMQ_HAVE_EVENTFD
|
#if defined ZMQ_HAVE_EVENTFD
|
||||||
const uint64_t inc = 1;
|
const uint64_t inc = 1;
|
||||||
ssize_t sz = write (w, &inc, sizeof (inc));
|
ssize_t sz = write (w, &inc, sizeof (inc));
|
||||||
@ -132,6 +142,13 @@ void zmq::signaler_t::send ()
|
|||||||
ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
|
ssize_t nbytes = ::send (w, &dummy, sizeof (dummy), 0);
|
||||||
if (unlikely (nbytes == -1 && errno == EINTR))
|
if (unlikely (nbytes == -1 && errno == EINTR))
|
||||||
continue;
|
continue;
|
||||||
|
#if HAVE_FORK
|
||||||
|
if (unlikely(pid != getpid())) {
|
||||||
|
//printf("Child process %d signaler_t::send returning without sending #2\n", getpid());
|
||||||
|
errno = EINTR;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
zmq_assert (nbytes == sizeof (dummy));
|
zmq_assert (nbytes == sizeof (dummy));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -140,6 +157,17 @@ void zmq::signaler_t::send ()
|
|||||||
|
|
||||||
int zmq::signaler_t::wait (int timeout_)
|
int zmq::signaler_t::wait (int timeout_)
|
||||||
{
|
{
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
if (unlikely(pid != getpid()))
|
||||||
|
{
|
||||||
|
// we have forked and the file descriptor is closed. Emulate an interupt
|
||||||
|
// response.
|
||||||
|
//printf("Child process %d signaler_t::wait returning simulating interrupt #1\n", getpid());
|
||||||
|
errno = EINTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
|
#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
|
||||||
|
|
||||||
struct pollfd pfd;
|
struct pollfd pfd;
|
||||||
@ -155,6 +183,16 @@ int zmq::signaler_t::wait (int timeout_)
|
|||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
if (unlikely(pid != getpid()))
|
||||||
|
{
|
||||||
|
// we have forked and the file descriptor is closed. Emulate an interupt
|
||||||
|
// response.
|
||||||
|
//printf("Child process %d signaler_t::wait returning simulating interrupt #2\n", getpid());
|
||||||
|
errno = EINTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
zmq_assert (rc == 1);
|
zmq_assert (rc == 1);
|
||||||
zmq_assert (pfd.revents & POLLIN);
|
zmq_assert (pfd.revents & POLLIN);
|
||||||
return 0;
|
return 0;
|
||||||
@ -225,6 +263,30 @@ void zmq::signaler_t::recv ()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
void zmq::signaler_t::forked()
|
||||||
|
{
|
||||||
|
int oldr = r;
|
||||||
|
int oldw = w;
|
||||||
|
|
||||||
|
// replace the file descriptors created in the parent with new
|
||||||
|
// ones, and close the inherited ones
|
||||||
|
make_fdpair(&r, &w);
|
||||||
|
#if defined ZMQ_HAVE_EVENTFD
|
||||||
|
int rc = close (oldr);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
#else
|
||||||
|
int rc = close (oldw);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
rc = close (oldr);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
||||||
{
|
{
|
||||||
#if defined ZMQ_HAVE_EVENTFD
|
#if defined ZMQ_HAVE_EVENTFD
|
||||||
|
@ -20,6 +20,10 @@
|
|||||||
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
|
#ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
|
||||||
#define __ZMQ_SIGNALER_HPP_INCLUDED__
|
#define __ZMQ_SIGNALER_HPP_INCLUDED__
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
#include <unistd.h>
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
@ -42,6 +46,12 @@ namespace zmq
|
|||||||
int wait (int timeout_);
|
int wait (int timeout_);
|
||||||
void recv ();
|
void recv ();
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
// close the file descriptors in a forked child process so that they
|
||||||
|
// do not interfere with the context in the parent process.
|
||||||
|
void forked();
|
||||||
|
#endif
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
// Creates a pair of filedescriptors that will be used
|
// Creates a pair of filedescriptors that will be used
|
||||||
@ -55,6 +65,14 @@ namespace zmq
|
|||||||
// Disable copying of signaler_t object.
|
// Disable copying of signaler_t object.
|
||||||
signaler_t (const signaler_t&);
|
signaler_t (const signaler_t&);
|
||||||
const signaler_t &operator = (const signaler_t&);
|
const signaler_t &operator = (const signaler_t&);
|
||||||
|
|
||||||
|
#ifdef HAVE_FORK
|
||||||
|
// the process that created this context. Used to detect forking.
|
||||||
|
pid_t pid;
|
||||||
|
// idempotent close of file descriptors that is safe to use by destructor
|
||||||
|
// and forked().
|
||||||
|
void close_internal();
|
||||||
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user