0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-22 07:29:31 +08:00
libzmq/src/socket_base.hpp

266 lines
8.8 KiB
C++
Raw Normal View History

/*
2014-01-02 12:00:57 +01:00
Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#define __ZMQ_SOCKET_BASE_HPP_INCLUDED__
#include <string>
#include <map>
#include <stdarg.h>
2010-08-11 14:09:56 +02:00
#include "own.hpp"
2010-08-31 21:03:34 +02:00
#include "array.hpp"
#include "blob.hpp"
2009-08-09 11:57:21 +02:00
#include "stdint.hpp"
#include "poller.hpp"
2009-11-21 20:59:55 +01:00
#include "atomic_counter.hpp"
#include "i_poll_events.hpp"
#include "mailbox.hpp"
#include "stdint.hpp"
#include "clock.hpp"
#include "pipe.hpp"
extern "C"
{
void zmq_free_event (void *data, void *hint);
}
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
2009-09-21 14:39:59 +02:00
class socket_base_t :
2010-08-11 14:09:56 +02:00
public own_t,
public array_item_t <>,
public i_poll_events,
public i_pipe_events
{
friend class reaper_t;
public:
// Returns false if object is not a socket.
bool check_tag ();
// Create a socket of a specified type.
static socket_base_t *create (int type_, zmq::ctx_t *parent_,
uint32_t tid_, int sid_);
// Returns the mailbox associated with this socket.
mailbox_t *get_mailbox ();
// Interrupt blocking call if the socket is stuck in one.
// This function can be called from a different thread!
void stop ();
2009-08-09 16:30:22 +02:00
// Interface for communication with the API layer.
2010-04-09 13:04:15 +02:00
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int getsockopt (int option_, void *optval_, size_t *optvallen_);
2009-09-21 14:39:59 +02:00
int bind (const char *addr_);
int connect (const char *addr_);
int term_endpoint (const char *addr_);
int send (zmq::msg_t *msg_, int flags_);
int recv (zmq::msg_t *msg_, int flags_);
2009-09-21 14:39:59 +02:00
int close ();
// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
bool has_out ();
// Using this function reaper thread ask the socket to regiter with
// its poller.
void start_reaping (poller_t *poller_);
// i_poll_events implementation. This interface is used when socket
// is handled by the poller in the reaper thread.
void in_event ();
void out_event ();
void timer_event (int id_);
2009-08-27 10:54:28 +02:00
// i_pipe_events interface implementation.
void read_activated (pipe_t *pipe_);
void write_activated (pipe_t *pipe_);
void hiccuped (pipe_t *pipe_);
2013-05-28 16:49:24 +02:00
void pipe_terminated (pipe_t *pipe_);
void lock();
void unlock();
int monitor (const char *endpoint_, int events_);
void set_fd(fd_t fd_);
fd_t fd();
void event_connected (std::string &addr_, int fd_);
void event_connect_delayed (std::string &addr_, int err_);
void event_connect_retried (std::string &addr_, int interval_);
void event_listening (std::string &addr_, int fd_);
void event_bind_failed (std::string &addr_, int err_);
void event_accepted (std::string &addr_, int fd_);
void event_accept_failed (std::string &addr_, int err_);
void event_closed (std::string &addr_, int fd_);
void event_close_failed (std::string &addr_, int fd_);
void event_disconnected (std::string &addr_, int fd_);
2009-09-21 14:39:59 +02:00
protected:
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
2009-09-21 14:39:59 +02:00
virtual ~socket_base_t ();
// Concrete algorithms for the x- methods are to be defined by
// individual socket types.
virtual void xattach_pipe (zmq::pipe_t *pipe_,
bool subscribe_to_all_ = false) = 0;
2009-09-21 14:39:59 +02:00
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, override this
// method.
2009-09-21 14:39:59 +02:00
virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_);
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
2012-11-09 17:08:03 +01:00
virtual int xsend (zmq::msg_t *msg_);
// The default implementation assumes that recv in not supported.
virtual bool xhas_in ();
2012-11-09 17:17:43 +01:00
virtual int xrecv (zmq::msg_t *msg_);
2009-09-02 16:16:25 +02:00
// Returns the credential for the peer from which we have received
// the last message. If no message has been received yet,
// the function returns empty credential.
virtual blob_t get_credential () const;
// i_pipe_events will be forwarded to these functions.
virtual void xread_activated (pipe_t *pipe_);
virtual void xwrite_activated (pipe_t *pipe_);
virtual void xhiccuped (pipe_t *pipe_);
2013-05-28 16:49:24 +02:00
virtual void xpipe_terminated (pipe_t *pipe_) = 0;
2010-08-11 14:09:56 +02:00
2010-08-12 15:03:51 +02:00
// Delay actual destruction of the socket.
void process_destroy ();
// Socket event data dispath
void monitor_event (zmq_event_t data_, const std::string& addr_);
// Monitor socket cleanup
void stop_monitor ();
2010-08-11 14:09:56 +02:00
private:
// Creates new endpoint ID and adds the endpoint to the map.
void add_endpoint (const char *addr_, own_t *endpoint_, pipe_t *pipe);
// Map of open endpoints.
typedef std::pair <own_t *, pipe_t*> endpoint_pipe_t;
typedef std::multimap <std::string, endpoint_pipe_t> endpoints_t;
endpoints_t endpoints;
2010-08-11 14:09:56 +02:00
// Map of open inproc endpoints.
typedef std::multimap <std::string, pipe_t *> inprocs_t;
inprocs_t inprocs;
// To be called after processing commands or invoking any command
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
// Moves the flags from the message to local variables,
// to be later retrieved by getsockopt.
void extract_flags (msg_t *msg_);
// Used to check whether the object is a socket.
uint32_t tag;
// If true, associated context was already terminated.
bool ctx_terminated;
2010-08-12 15:03:51 +02:00
// If true, object should have been already destroyed. However,
// destruction is delayed while we unwind the stack to the point
// where it doesn't intersect the object being destroyed.
bool destroyed;
// Parse URI string.
int parse_uri (const char *uri_, std::string &protocol_,
std::string &address_);
2010-08-11 14:09:56 +02:00
// Check whether transport protocol, as specified in connect or
// bind, is available and compatible with the socket type.
int check_protocol (const std::string &protocol_);
// Register the pipe with this socket.
void attach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
// Processes commands sent to this socket (if any). If timeout is -1,
// returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once
// in a predefined time period.
int process_commands (int timeout_, bool throttle_);
// Handlers for incoming commands.
void process_stop ();
void process_bind (zmq::pipe_t *pipe_);
void process_term (int linger_);
// Socket's mailbox object.
mailbox_t mailbox;
// List of attached pipes.
typedef array_t <pipe_t, 3> pipes_t;
pipes_t pipes;
// Reaper's poller and handle of this socket within it.
poller_t *poller;
poller_t::handle_t handle;
// Timestamp of when commands were processed the last time.
2010-09-26 16:55:54 +02:00
uint64_t last_tsc;
2009-08-27 10:54:28 +02:00
// Number of messages received since last command processing.
int ticks;
// True if the last message received had MORE flag set.
bool rcvmore;
// File descriptor if applicable
fd_t file_desc;
// Improves efficiency of time measurement.
clock_t clock;
// Monitor socket;
void *monitor_socket;
// Bitmask of events being monitored
int monitor_events;
// Last socket endpoint resolved URI
std::string last_endpoint;
socket_base_t (const socket_base_t&);
const socket_base_t &operator = (const socket_base_t&);
mutex_t sync;
};
}
#endif
2012-10-18 11:04:51 +09:00