0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 15:41:05 +08:00

multiple bugs fixed

This commit is contained in:
Martin Sustrik 2009-08-27 16:24:21 +02:00
parent 67194267f8
commit 2dd5016515
10 changed files with 61 additions and 15 deletions

View File

@ -27,9 +27,17 @@ namespace zmq
struct i_inout
{
// Engine asks to get a message to send to the network.
virtual bool read (::zmq_msg_t *msg_) = 0;
// Engine sends the incoming message further on downstream.
virtual bool write (::zmq_msg_t *msg_) = 0;
// Flush all the previously written messages downstream.
virtual void flush () = 0;
// Drop all the references to the engine.
virtual void detach () = 0;
};
}

View File

@ -42,8 +42,9 @@ namespace zmq
// Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_);
// Mnaipulation of index of the pipe.
void set_endpoint (i_endpoint *endpoint_);
// Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();

View File

@ -26,7 +26,7 @@ zmq::session_t::session_t (object_t *parent_, socket_base_t *owner_,
const char *name_, const options_t &options_) :
owned_t (parent_, owner_),
in_pipe (NULL),
active (false),
active (true),
out_pipe (NULL),
engine (NULL),
name (name_),
@ -74,6 +74,16 @@ void zmq::session_t::flush ()
out_pipe->flush ();
}
void zmq::session_t::detach ()
{
// Engine is terminating itself.
engine = NULL;
// TODO: In the case od anonymous connection, terminate the session.
// if (anonymous)
// term ();
}
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
@ -98,6 +108,7 @@ void zmq::session_t::process_plug ()
pipe_t *inbound = new pipe_t (this, owner, options.hwm, options.lwm);
zmq_assert (inbound);
in_pipe = &inbound->reader;
in_pipe->set_endpoint (this);
pipe_t *outbound = new pipe_t (owner, this, options.hwm, options.lwm);
zmq_assert (outbound);
out_pipe = &outbound->writer;

View File

@ -48,6 +48,7 @@ namespace zmq
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
void detach ();
// i_endpoint interface implementation.
void revive (class reader_t *pipe_);

View File

@ -80,6 +80,12 @@ void zmq::zmq_connecter_init_t::flush ()
zmq_assert (false);
}
void zmq::zmq_connecter_init_t::detach ()
{
// TODO: Engine is closing down. Init object is to be closed as well.
zmq_assert (false);
}
void zmq::zmq_connecter_init_t::process_plug ()
{
zmq_assert (engine);

View File

@ -49,6 +49,7 @@ namespace zmq
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
void detach ();
// Handlers for incoming commands.
void process_plug ();

View File

@ -73,4 +73,3 @@ bool zmq::zmq_encoder_t::message_ready ()
}
return true;
}

View File

@ -136,5 +136,8 @@ void zmq::zmq_engine_t::revive ()
void zmq::zmq_engine_t::error ()
{
zmq_assert (false);
zmq_assert (inout);
inout->detach ();
unplug ();
delete this;
}

View File

@ -17,8 +17,6 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include "zmq_listener_init.hpp"
#include "io_thread.hpp"
#include "session.hpp"
@ -27,7 +25,8 @@
zmq::zmq_listener_init_t::zmq_listener_init_t (io_thread_t *parent_,
socket_base_t *owner_, fd_t fd_, const options_t &options_) :
owned_t (parent_, owner_),
options (options_)
options (options_),
has_peer_identity (false)
{
// Create associated engine object.
engine = new zmq_engine_t (parent_, fd_);
@ -47,19 +46,33 @@ bool zmq::zmq_listener_init_t::read (::zmq_msg_t *msg_)
bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
{
// Once we've got peer's identity we aren't interested in subsequent
// messages.
if (has_peer_identity)
return false;
// Retreieve the remote identity. We'll use it as a local session name.
std::string session_name = std::string ((const char*) zmq_msg_data (msg_),
has_peer_identity = true;
peer_identity.assign ((const char*) zmq_msg_data (msg_),
zmq_msg_size (msg_));
return true;
}
void zmq::zmq_listener_init_t::flush ()
{
zmq_assert (has_peer_identity);
// Initialisation is done. Disconnect the engine from the init object.
engine->unplug ();
// Have a look whether the session already exists. If it does, attach it
// to the engine. If it doesn't create it first.
session_t *session = owner->find_session (session_name.c_str ());
session_t *session = owner->find_session (peer_identity.c_str ());
if (!session) {
io_thread_t *io_thread = choose_io_thread (options.affinity);
session = new session_t (io_thread, owner, session_name.c_str (),
session = new session_t (io_thread, owner, peer_identity.c_str (),
options);
zmq_assert (session);
send_plug (session);
@ -73,14 +86,12 @@ bool zmq::zmq_listener_init_t::write (::zmq_msg_t *msg_)
// Destroy the init object.
term ();
return true;
}
void zmq::zmq_listener_init_t::flush ()
void zmq::zmq_listener_init_t::detach ()
{
// No need to do anything. zmq_listener_init_t does no batching
// of messages. Each message is processed immediately on write.
// TODO: Engine is closing down. Init object is to be closed as well.
zmq_assert (false);
}
void zmq::zmq_listener_init_t::process_plug ()

View File

@ -49,6 +49,7 @@ namespace zmq
bool read (::zmq_msg_t *msg_);
bool write (::zmq_msg_t *msg_);
void flush ();
void detach ();
// Handlers for incoming commands.
void process_plug ();
@ -62,6 +63,10 @@ namespace zmq
// Associated socket options.
options_t options;
// Indetity on the other end of the connection.
bool has_peer_identity;
std::string peer_identity;
zmq_listener_init_t (const zmq_listener_init_t&);
void operator = (const zmq_listener_init_t&);
};