diff --git a/include/zmq.hpp b/include/zmq.hpp
index ccc234df..004706bf 100644
--- a/include/zmq.hpp
+++ b/include/zmq.hpp
@@ -38,7 +38,7 @@ namespace zmq
message_delimiter = 1 << ZMQ_DELIMITER
};
- class no_memory : public exception
+ class no_memory : public std::exception
{
virtual const char *what ()
{
@@ -46,7 +46,7 @@ namespace zmq
}
};
- class invalid_argument : public exception
+ class invalid_argument : public std::exception
{
virtual const char *what ()
{
@@ -54,7 +54,7 @@ namespace zmq
}
};
- class too_many_threads : public exception
+ class too_many_threads : public std::exception
{
virtual const char *what ()
{
@@ -62,7 +62,7 @@ namespace zmq
}
};
- class address_in_use : public exception
+ class address_in_use : public std::exception
{
virtual const char *what ()
{
diff --git a/src/Makefile.am b/src/Makefile.am
index bde9c39b..47037a24 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -7,14 +7,15 @@ libzmq_la_SOURCES = \
atomic_ptr.hpp \
command.hpp \
config.hpp \
- context.hpp \
decoder.hpp \
devpoll.hpp \
+ dispatcher.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
fd.hpp \
fd_signaler.hpp \
+ io_object.hpp \
io_thread.hpp \
ip.hpp \
i_api.hpp \
@@ -31,6 +32,7 @@ libzmq_la_SOURCES = \
poll.hpp \
select.hpp \
simple_semaphore.hpp \
+ socket_base.hpp \
stdint.hpp \
tcp_connecter.hpp \
tcp_listener.hpp \
@@ -42,25 +44,29 @@ libzmq_la_SOURCES = \
ypipe.hpp \
ypollset.hpp \
yqueue.hpp \
+ zmq_listener.hpp \
app_thread.cpp \
- context.cpp \
- devpoll.hpp \
+ devpoll.cpp \
+ dispatcher.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
+ io_object.cpp \
io_thread.cpp \
ip.cpp \
kqueue.cpp \
object.cpp \
poll.cpp \
select.cpp \
+ socket_base.cpp \
tcp_connecter.cpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
uuid.cpp \
ypollset.cpp \
- zmq.cpp
+ zmq.cpp \
+ zmq_listener.cpp
libzmq_la_LDFLAGS = -version-info 0:0:0
libzmq_la_CXXFLAGS = -Wall -pedantic -Werror @ZMQ_EXTRA_CXXFLAGS@
diff --git a/src/app_thread.cpp b/src/app_thread.cpp
index 23a055a8..3f769704 100644
--- a/src/app_thread.cpp
+++ b/src/app_thread.cpp
@@ -17,6 +17,8 @@
along with this program. If not, see .
*/
+#include
+
#include "../include/zmq.h"
#if defined ZMQ_HAVE_WINDOWS
@@ -26,10 +28,12 @@
#endif
#include "app_thread.hpp"
-#include "context.hpp"
+#include "i_api.hpp"
+#include "dispatcher.hpp"
#include "err.hpp"
#include "pipe.hpp"
#include "config.hpp"
+#include "socket_base.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@@ -39,8 +43,8 @@
#define ZMQ_DELAY_COMMANDS
#endif
-zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
- object_t (context_, thread_slot_),
+zmq::app_thread_t::app_thread_t (dispatcher_t *dispatcher_, int thread_slot_) :
+ object_t (dispatcher_, thread_slot_),
tid (0),
last_processing_time (0)
{
@@ -48,13 +52,9 @@ zmq::app_thread_t::app_thread_t (context_t *context_, int thread_slot_) :
zmq::app_thread_t::~app_thread_t ()
{
- // Ask all the sockets to start termination, then wait till it is complete.
- for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
- (*it)->stop ();
+ // Destroy all the sockets owned by this application thread.
for (sockets_t::iterator it = sockets.begin (); it != sockets.end (); it ++)
delete *it;
-
- delete this;
}
zmq::i_signaler *zmq::app_thread_t::get_signaler ()
@@ -123,9 +123,28 @@ void zmq::app_thread_t::process_commands (bool block_)
for (int i = 0; i != thread_slot_count (); i++) {
if (signals & (ypollset_t::signals_t (1) << i)) {
command_t cmd;
- while (context->read (i, get_thread_slot (), &cmd))
+ while (dispatcher->read (i, get_thread_slot (), &cmd))
cmd.destination->process_command (cmd);
}
}
}
}
+
+zmq::i_api *zmq::app_thread_t::create_socket (int type_)
+{
+ // TODO: type is ignored for the time being.
+ socket_base_t *s = new socket_base_t (this);
+ zmq_assert (s);
+ sockets.push_back (s);
+ return s;
+}
+
+void zmq::app_thread_t::remove_socket (i_api *socket_)
+{
+ // TODO: To speed this up we can possibly use the system where each socket
+ // holds its index (see I/O scheduler implementation).
+ sockets_t::iterator it = std::find (sockets.begin (), sockets.end (),
+ socket_);
+ zmq_assert (it != sockets.end ());
+ sockets.erase (it);
+}
diff --git a/src/app_thread.hpp b/src/app_thread.hpp
index 31679b8f..59e4a255 100644
--- a/src/app_thread.hpp
+++ b/src/app_thread.hpp
@@ -22,7 +22,6 @@
#include
-#include "i_socket.hpp"
#include "stdint.hpp"
#include "object.hpp"
#include "ypollset.hpp"
@@ -34,7 +33,7 @@ namespace zmq
{
public:
- app_thread_t (class context_t *context_, int thread_slot_);
+ app_thread_t (class dispatcher_t *dispatcher_, int thread_slot_);
~app_thread_t ();
@@ -42,7 +41,7 @@ namespace zmq
i_signaler *get_signaler ();
// Nota bene: Following two functions are accessed from different
- // threads. The caller (context) is responsible for synchronisation
+ // threads. The caller (dispatcher) is responsible for synchronisation
// of accesses.
// Returns true is current thread is associated with the app thread.
@@ -56,10 +55,16 @@ namespace zmq
// set to true, returns only after at least one command was processed.
void process_commands (bool block_);
+ // Create a socket of a specified type.
+ struct i_api *create_socket (int type_);
+
+ // Unregister the socket from the app_thread (called by socket itself).
+ void remove_socket (struct i_api *socket_);
+
private:
// All the sockets created from this application thread.
- typedef std::vector sockets_t;
+ typedef std::vector sockets_t;
sockets_t sockets;
// Thread ID associated with this slot.
diff --git a/src/command.hpp b/src/command.hpp
index 69c4e57e..de94ca3a 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -35,60 +35,49 @@ namespace zmq
enum type_t
{
stop,
+ plug,
+ own,
bind,
- head,
- tail,
- reg,
- reg_and_bind,
- unreg,
- engine,
- terminate,
- terminate_ack
+ term_req,
+ term,
+ term_ack
+
} type;
union {
+ // Sent to I/O thread to let it know that it should
+ // terminate itself.
struct {
} stop;
+ // Sent to I/O object to make it register with its I/O thread.
+ struct {
+ } plug;
+
+ // Sent to socket to let it know about the newly created object.
+ struct {
+ class object_t *object;
+ } own;
+
+ // Sent between objects to establish pipe(s) between them.
struct {
- class pipe_reader_t *reader;
- class session_t *peer;
} bind;
+ // Sent by I/O object ot the socket to request the shutdown of
+ // the I/O object.
struct {
- uint64_t bytes;
- } tail;
+ class object_t *object;
+ } term_req;
+ // Sent by socket to I/O object to start its shutdown.
struct {
- uint64_t bytes;
- } head;
+ } term;
+ // Sent by I/O object to the socket to acknowledge it has
+ // shut down.
struct {
- class simple_semaphore_t *smph;
- } reg;
-
- struct {
- class session_t *peer;
- bool flow_in;
- bool flow_out;
- } reg_and_bind;
-
- struct {
- class simple_semaphore_t *smph;
- } unreg;
-
- // TODO: Engine object won't be deallocated on terminal shutdown
- // while the command is still on the fly!
- struct {
- class i_engine *engine;
- } engine;
-
- struct {
- } terminate;
-
- struct {
- } terminate_ack;
+ } term_ack;
} args;
};
diff --git a/src/context.cpp b/src/dispatcher.cpp
similarity index 90%
rename from src/context.cpp
rename to src/dispatcher.cpp
index 6b071cf6..0b68880d 100644
--- a/src/context.cpp
+++ b/src/dispatcher.cpp
@@ -19,7 +19,7 @@
#include "../include/zmq.h"
-#include "context.hpp"
+#include "dispatcher.hpp"
#include "i_api.hpp"
#include "app_thread.hpp"
#include "io_thread.hpp"
@@ -31,7 +31,7 @@
#include "windows.h"
#endif
-zmq::context_t::context_t (int app_threads_, int io_threads_)
+zmq::dispatcher_t::dispatcher_t (int app_threads_, int io_threads_)
{
#ifdef ZMQ_HAVE_WINDOWS
// Intialise Windows sockets. Note that WSAStartup can be called multiple
@@ -69,7 +69,7 @@ zmq::context_t::context_t (int app_threads_, int io_threads_)
io_threads [i]->start ();
}
-zmq::context_t::~context_t ()
+zmq::dispatcher_t::~dispatcher_t ()
{
// Close all application theads, sockets, io_objects etc.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
@@ -93,12 +93,12 @@ zmq::context_t::~context_t ()
#endif
}
-int zmq::context_t::thread_slot_count ()
+int zmq::dispatcher_t::thread_slot_count ()
{
return signalers.size ();
}
-zmq::i_api *zmq::context_t::create_socket (int type_)
+zmq::i_api *zmq::dispatcher_t::create_socket (int type_)
{
threads_sync.lock ();
app_thread_t *thread = choose_app_thread ();
@@ -106,16 +106,12 @@ zmq::i_api *zmq::context_t::create_socket (int type_)
threads_sync.unlock ();
return NULL;
}
-
- zmq_assert (false);
- i_api *s = NULL;
- //i_api *s = thread->create_socket (type_);
-
threads_sync.unlock ();
- return s;
+
+ return thread->create_socket (type_);
}
-zmq::app_thread_t *zmq::context_t::choose_app_thread ()
+zmq::app_thread_t *zmq::dispatcher_t::choose_app_thread ()
{
// Check whether thread ID is already assigned. If so, return it.
for (app_threads_t::size_type i = 0; i != app_threads.size (); i++)
@@ -132,7 +128,7 @@ zmq::app_thread_t *zmq::context_t::choose_app_thread ()
return NULL;
}
-zmq::io_thread_t *zmq::context_t::choose_io_thread (uint64_t taskset_)
+zmq::io_thread_t *zmq::dispatcher_t::choose_io_thread (uint64_t taskset_)
{
zmq_assert (io_threads.size () > 0);
diff --git a/src/context.hpp b/src/dispatcher.hpp
similarity index 87%
rename from src/context.hpp
rename to src/dispatcher.hpp
index f2eab1c0..08ffab12 100644
--- a/src/context.hpp
+++ b/src/dispatcher.hpp
@@ -17,8 +17,8 @@
along with this program. If not, see .
*/
-#ifndef __ZMQ_CONTEXT_HPP_INCLUDED__
-#define __ZMQ_CONTEXT_HPP_INCLUDED__
+#ifndef __ZMQ_DISPATCHER_HPP_INCLUDED__
+#define __ZMQ_DISPATCHER_HPP_INCLUDED__
#include
#include