2014-03-15 10:46:21 -04:00
|
|
|
|
|
|
|
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
|
|
|
|
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
|
|
|
|
|
|
|
|
#if defined ZMQ_HAVE_NORM
|
|
|
|
|
|
|
|
#include "io_object.hpp"
|
|
|
|
#include "i_engine.hpp"
|
|
|
|
#include "options.hpp"
|
|
|
|
#include "v2_decoder.hpp"
|
|
|
|
#include "v2_encoder.hpp"
|
|
|
|
|
2014-03-16 09:15:31 -04:00
|
|
|
#include <normApi.h>
|
2014-03-15 10:46:21 -04:00
|
|
|
|
|
|
|
namespace zmq
|
|
|
|
{
|
2018-02-01 11:46:09 +01:00
|
|
|
class io_thread_t;
|
2018-05-18 17:47:47 +02:00
|
|
|
class msg_t;
|
2018-02-01 11:46:09 +01:00
|
|
|
class session_base_t;
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2019-12-24 10:39:26 +01:00
|
|
|
class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
|
2018-02-01 11:46:09 +01:00
|
|
|
{
|
|
|
|
public:
|
|
|
|
norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
|
2019-12-24 10:39:26 +01:00
|
|
|
~norm_engine_t () ZMQ_FINAL;
|
2018-02-01 11:46:09 +01:00
|
|
|
|
|
|
|
// create NORM instance, session, etc
|
|
|
|
int init (const char *network_, bool send, bool recv);
|
|
|
|
void shutdown ();
|
|
|
|
|
2020-05-13 17:32:06 +03:00
|
|
|
bool has_handshake_stage () ZMQ_FINAL { return false; };
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
// i_engine interface implementation.
|
|
|
|
// Plug the engine to the session.
|
2019-12-24 10:39:26 +01:00
|
|
|
void plug (zmq::io_thread_t *io_thread_,
|
|
|
|
class session_base_t *session_) ZMQ_FINAL;
|
2018-02-01 11:46:09 +01:00
|
|
|
|
|
|
|
// Terminate and deallocate the engine. Note that 'detached'
|
|
|
|
// events are not fired on termination.
|
2019-12-24 10:39:26 +01:00
|
|
|
void terminate () ZMQ_FINAL;
|
2018-02-01 11:46:09 +01:00
|
|
|
|
|
|
|
// This method is called by the session to signalise that more
|
|
|
|
// messages can be written to the pipe.
|
2019-12-24 10:39:26 +01:00
|
|
|
bool restart_input () ZMQ_FINAL;
|
2018-02-01 11:46:09 +01:00
|
|
|
|
|
|
|
// This method is called by the session to signalise that there
|
|
|
|
// are messages to send available.
|
2019-12-24 10:39:26 +01:00
|
|
|
void restart_output () ZMQ_FINAL;
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2019-12-24 10:39:26 +01:00
|
|
|
void zap_msg_available () ZMQ_FINAL {}
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2019-12-24 10:39:26 +01:00
|
|
|
const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
|
2018-02-01 11:46:09 +01:00
|
|
|
|
|
|
|
// i_poll_events interface implementation.
|
|
|
|
// (we only need in_event() for NormEvent notification)
|
|
|
|
// (i.e., don't have any output events or timers (yet))
|
|
|
|
void in_event ();
|
|
|
|
|
|
|
|
private:
|
|
|
|
void unplug ();
|
|
|
|
void send_data ();
|
|
|
|
void recv_data (NormObjectHandle stream);
|
|
|
|
|
|
|
|
|
|
|
|
enum
|
|
|
|
{
|
|
|
|
BUFFER_SIZE = 2048
|
|
|
|
};
|
|
|
|
|
|
|
|
// Used to keep track of streams from multiple senders
|
|
|
|
class NormRxStreamState
|
|
|
|
{
|
|
|
|
public:
|
2018-03-05 13:19:20 +01:00
|
|
|
NormRxStreamState (NormObjectHandle normStream,
|
|
|
|
int64_t maxMsgSize,
|
2019-06-27 00:34:56 -04:00
|
|
|
bool zeroCopy,
|
|
|
|
int inBatchSize);
|
2018-02-01 11:46:09 +01:00
|
|
|
~NormRxStreamState ();
|
2014-03-15 10:46:21 -04:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
NormObjectHandle GetStreamHandle () const { return norm_stream; }
|
2014-03-15 10:46:21 -04:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
bool Init ();
|
2014-03-15 10:46:21 -04:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
void SetRxReady (bool state) { rx_ready = state; }
|
|
|
|
bool IsRxReady () const { return rx_ready; }
|
2014-03-15 10:46:21 -04:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
void SetSync (bool state) { in_sync = state; }
|
|
|
|
bool InSync () const { return in_sync; }
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
// These are used to feed data to decoder
|
|
|
|
// and its underlying "msg" buffer
|
|
|
|
char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
|
2019-03-01 05:24:40 -05:00
|
|
|
size_t GetBytesNeeded () const { return buffer_size - buffer_count; }
|
2018-02-01 11:46:09 +01:00
|
|
|
void IncrementBufferCount (size_t count) { buffer_count += count; }
|
|
|
|
msg_t *AccessMsg () { return zmq_decoder->msg (); }
|
|
|
|
// This invokes the decoder "decode" method
|
|
|
|
// returning 0 if more data is needed,
|
|
|
|
// 1 if the message is complete, If an error
|
|
|
|
// occurs the 'sync' is dropped and the
|
|
|
|
// decoder re-initialized
|
|
|
|
int Decode ();
|
2017-08-08 15:18:07 +01:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
class List
|
|
|
|
{
|
|
|
|
public:
|
|
|
|
List ();
|
|
|
|
~List ();
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
void Append (NormRxStreamState &item);
|
|
|
|
void Remove (NormRxStreamState &item);
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2019-03-01 05:24:40 -05:00
|
|
|
bool IsEmpty () const { return NULL == head; }
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
void Destroy ();
|
2016-09-17 08:46:54 +02:00
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
class Iterator
|
2014-03-15 10:46:21 -04:00
|
|
|
{
|
2018-02-01 11:46:09 +01:00
|
|
|
public:
|
|
|
|
Iterator (const List &list);
|
|
|
|
NormRxStreamState *GetNextItem ();
|
|
|
|
|
|
|
|
private:
|
|
|
|
NormRxStreamState *next_item;
|
|
|
|
};
|
|
|
|
friend class Iterator;
|
|
|
|
|
|
|
|
private:
|
|
|
|
NormRxStreamState *head;
|
|
|
|
NormRxStreamState *tail;
|
|
|
|
|
|
|
|
}; // end class zmq::norm_engine_t::NormRxStreamState::List
|
|
|
|
|
|
|
|
friend class List;
|
|
|
|
|
|
|
|
List *AccessList () { return list; }
|
|
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
NormObjectHandle norm_stream;
|
|
|
|
int64_t max_msg_size;
|
2018-03-05 13:19:20 +01:00
|
|
|
bool zero_copy;
|
2019-06-27 00:34:56 -04:00
|
|
|
int in_batch_size;
|
2018-02-01 11:46:09 +01:00
|
|
|
bool in_sync;
|
|
|
|
bool rx_ready;
|
|
|
|
v2_decoder_t *zmq_decoder;
|
|
|
|
bool skip_norm_sync;
|
|
|
|
unsigned char *buffer_ptr;
|
|
|
|
size_t buffer_size;
|
|
|
|
size_t buffer_count;
|
|
|
|
|
|
|
|
NormRxStreamState *prev;
|
|
|
|
NormRxStreamState *next;
|
|
|
|
NormRxStreamState::List *list;
|
|
|
|
|
|
|
|
}; // end class zmq::norm_engine_t::NormRxStreamState
|
|
|
|
|
2019-02-01 05:43:45 -05:00
|
|
|
const endpoint_uri_pair_t _empty_endpoint;
|
|
|
|
|
2018-02-01 11:46:09 +01:00
|
|
|
session_base_t *zmq_session;
|
|
|
|
options_t options;
|
|
|
|
NormInstanceHandle norm_instance;
|
|
|
|
handle_t norm_descriptor_handle;
|
|
|
|
NormSessionHandle norm_session;
|
|
|
|
bool is_sender;
|
|
|
|
bool is_receiver;
|
|
|
|
// Sender state
|
|
|
|
msg_t tx_msg;
|
|
|
|
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
|
|
|
|
NormObjectHandle norm_tx_stream;
|
|
|
|
bool tx_first_msg;
|
|
|
|
bool tx_more_bit;
|
|
|
|
bool zmq_output_ready; // zmq has msg(s) to send
|
|
|
|
bool norm_tx_ready; // norm has tx queue vacancy
|
|
|
|
// TBD - maybe don't need buffer if can access zmq message buffer directly?
|
|
|
|
char tx_buffer[BUFFER_SIZE];
|
|
|
|
unsigned int tx_index;
|
|
|
|
unsigned int tx_len;
|
|
|
|
|
|
|
|
// Receiver state
|
|
|
|
// Lists of norm rx streams from remote senders
|
|
|
|
bool zmq_input_ready; // zmq ready to receive msg(s)
|
|
|
|
NormRxStreamState::List
|
|
|
|
rx_pending_list; // rx streams waiting for data reception
|
|
|
|
NormRxStreamState::List
|
|
|
|
rx_ready_list; // rx streams ready for NormStreamRead()
|
|
|
|
NormRxStreamState::List
|
|
|
|
msg_ready_list; // rx streams w/ msg ready for push to zmq
|
|
|
|
|
|
|
|
|
|
|
|
}; // end class norm_engine_t
|
2014-03-15 10:46:21 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
#endif // ZMQ_HAVE_NORM
|
|
|
|
|
|
|
|
#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__
|