diff --git a/configure.ac b/configure.ac index 963a6ead..96463fc1 100644 --- a/configure.ac +++ b/configure.ac @@ -463,6 +463,28 @@ fi AC_SUBST(pgm_basename) + +# This uses "--with-norm" to point to the "norm" directory +# for "norm/include" and "norm/lib" +#(if "--with-norm=yes" is given, then assume installed on system) +AC_ARG_WITH([norm], [AS_HELP_STRING([--with-norm], + [build libzmq with NORM protocol extension, optionally specifying norm path [default=no]])], + [with_norm_ext=$withval], [with_norm_ext=no]) + + +AC_MSG_CHECKING("with_norm_ext = ${with_norm_ext}") + +if test "x$with_norm_ext" != "xno"; then + AC_DEFINE(ZMQ_HAVE_NORM, 1, [Have NORM protocol extension]) + if test "x$wwith_norm_ext" != "xyes"; then + norm_path="${with_norm_ext}" + LIBZMQ_EXTRA_CXXFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_CXXFLAGS}" + LIBZMQ_EXTRA_LDFLAGS="-I${norm_path}/include ${LIBZMQ_EXTRA_LDFLAGS}" + fi + LIBS="-lnorm $LIBS" +fi + + # Set -Wall, -Werror and -pedantic AC_LANG_PUSH([C++]) diff --git a/src/Makefile.am b/src/Makefile.am index 94b53a85..53390b20 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -45,6 +45,7 @@ libzmq_la_SOURCES = \ msg.hpp \ mtrie.hpp \ mutex.hpp \ + norm_engine.hpp \ null_mechanism.hpp \ object.hpp \ options.hpp \ @@ -112,6 +113,7 @@ libzmq_la_SOURCES = \ mechanism.cpp \ msg.cpp \ mtrie.cpp \ + norm_engine.cpp \ null_mechanism.cpp \ object.cpp \ options.cpp \ diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp new file mode 100644 index 00000000..f1c99695 --- /dev/null +++ b/src/norm_engine.cpp @@ -0,0 +1,704 @@ + +#include "platform.hpp" + +#define ZMQ_HAVE_NORM 1 + +#if defined ZMQ_HAVE_NORM + +#include "norm_engine.hpp" +#include "session_base.hpp" +#include "v2_protocol.hpp" + +zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, + const options_t& options_) + : io_object_t(parent_), zmq_session(NULL), options(options_), + norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), + is_sender(false), is_receiver(false), + zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false), + norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false), + tx_index(0), tx_len(0), + zmq_input_ready(false) +{ + int rc = tx_msg.init(); + errno_assert(0 == rc); +} + +zmq::norm_engine_t::~norm_engine_t() +{ + shutdown(); // in case it was not already called +} + + +int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) +{ + // Parse the "network_" address int "iface", "addr", and "port" + // norm endpoint format: [id,][;]: + // First, look for optional local NormNodeId + NormNodeId localId = NORM_NODE_ANY; + const char* ifacePtr = strchr(network_, ','); + if (NULL != ifacePtr) + { + size_t idLen = ifacePtr - network_; + if (idLen > 31) idLen = 31; + char idText[32]; + strncpy(idText, network_, idLen); + idText[idLen] = '\0'; + localId = (NormNodeId)atoi(idText); + ifacePtr++; + } + else + { + ifacePtr = network_; + } + + // Second, look for optional multicast ifaceName + char ifaceName[256]; + const char* addrPtr = strchr(ifacePtr, ';'); + if (NULL != addrPtr) + { + size_t ifaceLen = addrPtr - ifacePtr; + if (ifaceLen > 255) ifaceLen = 255; // return error instead? + strncpy(ifaceName, ifacePtr, ifaceLen); + ifaceName[ifaceLen] = '\0'; + ifacePtr = ifaceName; + addrPtr++; + } + else + { + addrPtr = ifacePtr; + ifacePtr = NULL; + } + + // Finally, parse IP address and port number + const char* portPtr = strrchr(addrPtr, ':'); + if (NULL == portPtr) + { + errno = EINVAL; + return -1; + } + + char addr[256]; + size_t addrLen = portPtr - addrPtr; + if (addrLen > 255) addrLen = 255; + strncpy(addr, addrPtr, addrLen); + addr[addrLen] = '\0'; + portPtr++; + unsigned short portNumber = atoi(portPtr); + + if (NORM_INSTANCE_INVALID == norm_instance) + { + if (NORM_INSTANCE_INVALID == (norm_instance = NormCreateInstance())) + { + // errno set by whatever caused NormCreateInstance() to fail + return -1; + } + } + + // TBD - What do we use for our local NormNodeId? + // (for now we use automatic, IP addr based assignment or passed in 'id') + // a) Add function to use iface addr + // b) Randomize and implement a NORM session layer + // conflict detection/resolution protocol + + norm_session = NormCreateSession(norm_instance, addr, portNumber, localId); + if (NORM_SESSION_INVALID == norm_session) + { + int savedErrno = errno; + NormDestroyInstance(norm_instance); + norm_instance = NORM_INSTANCE_INVALID; + errno = savedErrno; + return -1; + } + if (NormIsUnicastAddress(addr)) + { + NormSetDefaultUnicastNack(norm_session, true); + } + else + { + // These only apply for multicast sessions + //NormSetTTL(norm_session, options.multicast_hops); // ZMQ default is 1 + NormSetTTL(norm_session, 255); // since the ZMQ_MULTICAST_HOPS socket option isn't well-supported + NormSetRxPortReuse(norm_session, true); // port reuse doesn't work for non-connected unicast + NormSetLoopback(norm_session, true); // needed when multicast users on same machine + if (NULL != ifacePtr) + { + // Note a bad interface may not be caught until sender or receiver start + // (Since sender/receiver is not yet started, this always succeeds here) + NormSetMulticastInterface(norm_session, ifacePtr); + } + } + + if (recv) + { + NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM); + if (!NormStartReceiver(norm_session, 2*1024*1024)) + { + // errno set by whatever failed + int savedErrno = errno; + NormDestroyInstance(norm_instance); // session gets closed, too + norm_session = NORM_SESSION_INVALID; + norm_instance = NORM_INSTANCE_INVALID; + errno = savedErrno; + return -1; + } + is_receiver = true; + } + + if (send) + { + // Pick a random sender instance id (aka norm sender session id) + NormSessionId instanceId = NormGetRandomSessionId(); + // TBD - provide "options" for some NORM sender parameters + if (!NormStartSender(norm_session, instanceId, 2*1024*1024, 1400, 16, 4)) + { + // errno set by whatever failed + int savedErrno = errno; + NormDestroyInstance(norm_instance); // session gets closed, too + norm_session = NORM_SESSION_INVALID; + norm_instance = NORM_INSTANCE_INVALID; + errno = savedErrno; + return -1; + } + NormSetCongestionControl(norm_session, true); + norm_tx_ready = true; + is_sender = true; + if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024))) + { + // errno set by whatever failed + int savedErrno = errno; + NormDestroyInstance(norm_instance); // session gets closed, too + norm_session = NORM_SESSION_INVALID; + norm_instance = NORM_INSTANCE_INVALID; + errno = savedErrno; + return -1; + } + } + + return 0; // no error +} // end zmq::norm_engine_t::init() + +void zmq::norm_engine_t::shutdown() +{ + // TBD - implement a more graceful shutdown option + if (is_receiver) + { + NormStopReceiver(norm_session); + + // delete any active NormRxStreamState + rx_pending_list.Destroy(); + rx_ready_list.Destroy(); + msg_ready_list.Destroy(); + + is_receiver = false; + } + if (is_sender) + { + NormStopSender(norm_session); + is_sender = false; + } + if (NORM_SESSION_INVALID != norm_session) + { + NormDestroySession(norm_session); + norm_session = NORM_SESSION_INVALID; + } + if (NORM_INSTANCE_INVALID != norm_instance) + { + NormStopInstance(norm_instance); + NormDestroyInstance(norm_instance); + norm_instance = NORM_INSTANCE_INVALID; + } +} // end zmq::norm_engine_t::shutdown() + +void zmq::norm_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_) +{ + // TBD - we may assign the NORM engine to an io_thread in the future??? + zmq_session = session_; + if (is_sender) zmq_output_ready = true; + if (is_receiver) zmq_input_ready = true; + + fd_t normDescriptor = NormGetDescriptor(norm_instance); + norm_descriptor_handle = add_fd(normDescriptor); + // Set POLLIN for notification of pending NormEvents + set_pollin(norm_descriptor_handle); + + if (is_sender) send_data(); + +} // end zmq::norm_engine_t::init() + +void zmq::norm_engine_t::unplug() +{ + rm_fd(norm_descriptor_handle); + + zmq_session = NULL; +} // end zmq::norm_engine_t::unplug() + +void zmq::norm_engine_t::terminate() +{ + unplug(); + shutdown(); + delete this; +} + +void zmq::norm_engine_t::restart_output() +{ + // There's new message data available from the session + zmq_output_ready = true; + if (norm_tx_ready) send_data(); + +} // end zmq::norm_engine_t::restart_output() + +void zmq::norm_engine_t::send_data() +{ + // Here we write as much as is available or we can + while (zmq_output_ready && norm_tx_ready) + { + // Do we have data in our tx_buffer pending + if (tx_index < tx_len) + { + tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index); + if (tx_index < tx_len) + { + // NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY + norm_tx_ready = false; + break; + } + else if (!zmq_encoder.has_data()) + { + // Buffer contained end of message (should we flush?) + //NormStreamMarkEom(norm_tx_stream); + // Note this makes NORM fairly chatty for low duty cycle messaging + NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE); + } + tx_index = tx_len = 0; // all buffered data was written + } + // Still norm_tx_ready, so ask for more data from zmq_session + if (!zmq_encoder.has_data()) + { + // Existing message had no more data to encode + if (-1 == zmq_session->pull_msg(&tx_msg)) + { + // We need to wait for "restart_output()" to be called by ZMQ + zmq_output_ready = false; + break; + } + zmq_encoder.load_msg(&tx_msg); + // Should we write message size header for NORM to use? Or expect NORM + // receiver to decode ZMQ message framing format(s)? + // OK - we need to use a byte to denote when the ZMQ frame is the _first_ + // frame of a message so it can be decoded properly when a receiver + // 'syncs' mid-stream. We key off the the state of the 'more_flag' + // I.e.,If more_flag _was_ false previously, this is the first + // frame of a ZMQ message. + if (tx_more_bit) + tx_buffer[0] = (char)0xff; // this is not first frame of message + else + tx_buffer[0] = 0x00; // this is first frame of message + tx_more_bit = (0 != (tx_msg.flags() & v2_protocol_t::more_flag)); + tx_len = 1; + } + // Get more data from encoder + size_t space = BUFFER_SIZE - tx_index; + unsigned char* bufPtr = (unsigned char*)(tx_buffer + tx_len); + size_t bytes = zmq_encoder.encode(&bufPtr, space); + tx_len += bytes; + } +} // end zmq::norm_engine_t::send_data() + +void zmq::norm_engine_t::in_event() +{ + // This means a NormEvent is pending, so call NormGetNextEvent() and handle + NormEvent event; + if (!NormGetNextEvent(norm_instance, &event)) + { + // NORM has died before we unplugged?! + zmq_assert(false); + return; + } + + switch(event.type) + { + case NORM_TX_QUEUE_VACANCY: + case NORM_TX_QUEUE_EMPTY: + if (!norm_tx_ready) + { + norm_tx_ready = true; + send_data(); + } + break; + + case NORM_RX_OBJECT_NEW: + break; + case NORM_RX_OBJECT_UPDATED: + recv_data(event.object); + break; + + case NORM_RX_OBJECT_ABORTED: + { + NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object); + if (NULL != rxState) + { + // Remove the state from the list it's in + // This is now unnecessary since deletion takes care of list removal + // but in the interest of being clear ... + NormRxStreamState::List* list = rxState->AccessList(); + if (NULL != list) list->Remove(*rxState); + } + delete rxState; + } + break; + + case NORM_REMOTE_SENDER_INACTIVE: + // Here we free resources used for this formerly active sender. + // Note w/ NORM_SYNC_STREAM, if sender reactivates, we may + // get some messages delivered twice. NORM_SYNC_CURRENT would + // mitigate that but might miss data at startup. Always tradeoffs. + // Instead of immediately deleting, we could instead initiate a + // user configurable timeout here to wait some amount of time + // after this event to declare the remote sender truly dead + // and delete its state??? + NormNodeDelete(event.sender); + break; + + default: + // We ignore some NORM events + break; + } +} // zmq::norm_engine_t::in_event() + +void zmq::norm_engine_t::restart_input() +{ + // TBD - should we check/assert that zmq_input_ready was false??? + zmq_input_ready = true; + // Process any pending received messages + if (!msg_ready_list.IsEmpty()) + recv_data(NORM_OBJECT_INVALID); + +} // end zmq::norm_engine_t::restart_input() + +void zmq::norm_engine_t::recv_data(NormObjectHandle object) +{ + if (NORM_OBJECT_INVALID != object) + { + // Call result of NORM_RX_OBJECT_UPDATED notification + // This is a rx_ready indication for a new or existing rx stream + // First, determine if this is a stream we already know + zmq_assert(NORM_OBJECT_STREAM == NormObjectGetType(object)); + // Since there can be multiple senders (publishers), we keep + // state for each separate rx stream. + NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(object); + if (NULL == rxState) + { + // This is a new stream, so create rxState with zmq decoder, etc + rxState = new NormRxStreamState(object, options.maxmsgsize); + if (!rxState->Init()) + { + errno_assert(false); + delete rxState; + return; + } + NormObjectSetUserData(object, rxState); + } + else if (!rxState->IsRxReady()) + { + // Existing non-ready stream, so remove from pending + // list to be promoted to rx_ready_list ... + rx_pending_list.Remove(*rxState); + } + if (!rxState->IsRxReady()) + { + // TBD - prepend up front for immediate service? + rxState->SetRxReady(true); + rx_ready_list.Append(*rxState); + } + } + // This loop repeats until we've read all data available from "rx ready" inbound streams + // and pushed any accumulated messages we can up to the zmq session. + while (!rx_ready_list.IsEmpty() || (zmq_input_ready && !msg_ready_list.IsEmpty())) + { + // Iterate through our rx_ready streams, reading data into the decoder + // (This services incoming "rx ready" streams in a round-robin fashion) + NormRxStreamState::List::Iterator iterator(rx_ready_list); + NormRxStreamState* rxState; + while (NULL != (rxState = iterator.GetNextItem())) + { + switch(rxState->Decode()) + { + case 1: // msg completed + // Complete message decoded, move this stream to msg_ready_list + // to push the message up to the session below. Note the stream + // will be returned to the "rx_ready_list" after that's done + rx_ready_list.Remove(*rxState); + msg_ready_list.Append(*rxState); + continue; + + case -1: // decoding error (shouldn't happen w/ NORM, but ...) + // We need to re-sync this stream (decoder buffer was reset) + rxState->SetSync(false); + break; + + default: // 0 - need more data + break; + } + // Get more data from this stream + NormObjectHandle stream = rxState->GetStreamHandle(); + // First, make sure we're in sync ... + while (!rxState->InSync()) + { + // seek NORM message start + if (!NormStreamSeekMsgStart(stream)) + { + // Need to wait for more data + break; + } + // read message 'flag' byte to see if this it's a 'final' frame + char syncFlag; + unsigned int numBytes = 1; + if (!NormStreamRead(stream, &syncFlag, &numBytes)) + { + // broken stream (shouldn't happen after seek msg start?) + zmq_assert(false); + continue; + } + if (0 == numBytes) + { + // This probably shouldn't happen either since we found msg start + // Need to wait for more data + break; + } + if (0 == syncFlag) rxState->SetSync(true); + // else keep seeking ... + } // end while(!rxState->InSync()) + if (!rxState->InSync()) + { + // Need more data for this stream, so remove from "rx ready" + // list and iterate to next "rx ready" stream + rxState->SetRxReady(false); + // Move from rx_ready_list to rx_pending_list + rx_ready_list.Remove(*rxState); + rx_pending_list.Append(*rxState); + continue; + } + // Now we're actually ready to read data from the NORM stream to the zmq_decoder + // the underlying zmq_decoder->get_buffer() call sets how much is needed. + unsigned int numBytes = rxState->GetBytesNeeded(); + if (!NormStreamRead(stream, rxState->AccessBuffer(), &numBytes)) + { + // broken NORM stream, so re-sync + rxState->Init(); // TBD - check result + // This will retry syncing, and getting data from this stream + // since we don't increment the "it" iterator + continue; + } + rxState->IncrementBufferCount(numBytes); + if (0 == numBytes) + { + // All the data available has been read + // Need to wait for NORM_RX_OBJECT_UPDATED for this stream + rxState->SetRxReady(false); + // Move from rx_ready_list to rx_pending_list + rx_ready_list.Remove(*rxState); + rx_pending_list.Append(*rxState); + } + } // end while(NULL != (rxState = iterator.GetNextItem())) + + if (zmq_input_ready) + { + // At this point, we've made a pass through the "rx_ready" stream list + // Now make a pass through the "msg_pending" list (if the zmq session + // ready for more input). This may possibly return streams back to + // the "rx ready" stream list after their pending message is handled + NormRxStreamState::List::Iterator iterator(msg_ready_list); + NormRxStreamState* rxState; + while (NULL != (rxState = iterator.GetNextItem())) + { + msg_t* msg = rxState->AccessMsg(); + int rc = zmq_session->push_msg(msg); + if (-1 == rc) + { + if (EAGAIN == errno) + { + // need to wait until session calls "restart_input()" + zmq_input_ready = false; + break; + } + else + { + // session rejected message? + // TBD - handle this better + zmq_assert(false); + } + } + // else message was accepted. + msg_ready_list.Remove(*rxState); + if (rxState->IsRxReady()) // Move back to "rx_ready" list to read more data + rx_ready_list.Append(*rxState); + else // Move back to "rx_pending" list until NORM_RX_OBJECT_UPDATED + msg_ready_list.Append(*rxState); + } // end while(NULL != (rxState = iterator.GetNextItem())) + } // end if (zmq_input_ready) + } // end while ((!rx_ready_list.empty() || (zmq_input_ready && !msg_ready_list.empty())) + + // Alert zmq of the messages we have pushed up + zmq_session->flush(); + +} // end zmq::norm_engine_t::recv_data() + +zmq::norm_engine_t::NormRxStreamState::NormRxStreamState(NormObjectHandle normStream, + int64_t maxMsgSize) + : norm_stream(normStream), max_msg_size(maxMsgSize), + in_sync(false), rx_ready(false), zmq_decoder(NULL), skip_norm_sync(false), + buffer_ptr(NULL), buffer_size(0), buffer_count(0), + prev(NULL), next(NULL), list(NULL) +{ +} + +zmq::norm_engine_t::NormRxStreamState::~NormRxStreamState() +{ + if (NULL != zmq_decoder) + { + delete zmq_decoder; + zmq_decoder = NULL; + } + if (NULL != list) + { + list->Remove(*this); + list = NULL; + } +} + +bool zmq::norm_engine_t::NormRxStreamState::Init() +{ + in_sync = false; + skip_norm_sync = false; + if (NULL != zmq_decoder) delete zmq_decoder; + // Note "in_batch_size" comes from config.h + zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size); + alloc_assert (zmq_decoder); + if (NULL != zmq_decoder) + { + buffer_count = 0; + buffer_size = 0; + zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); + return true; + } + else + { + return false; + } +} // end zmq::norm_engine_t::NormRxStreamState::Init() + +// This decodes any pending data sitting in our stream decoder buffer +// It returns 1 upon message completion, -1 on error, 1 on msg completion +int zmq::norm_engine_t::NormRxStreamState::Decode() +{ + // If we have pending bytes to decode, process those first + while (buffer_count > 0) + { + // There's pending data for the decoder to decode + size_t processed = 0; + + // This a bit of a kludgy approach used to weed + // out the NORM ZMQ message transport "syncFlag" byte + // from the ZMQ message stream being decoded (but it works!) + if (skip_norm_sync) + { + buffer_ptr++; + buffer_count--; + skip_norm_sync = false; + } + + int rc = zmq_decoder->decode(buffer_ptr, buffer_count, processed); + buffer_ptr += processed; + buffer_count -= processed; + switch (rc) + { + case 1: + // msg completed + if (0 == buffer_count) + { + buffer_size = 0; + zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); + } + skip_norm_sync = true; + return 1; + case -1: + // decoder error (reset decoder and state variables) + in_sync = false; + skip_norm_sync = false; // will get consumed by norm sync check + Init(); + break; + + case 0: + // need more data, keep decoding until buffer exhausted + break; + } + } + // Reset buffer pointer/count for next read + buffer_count = 0; + buffer_size = 0; + zmq_decoder->get_buffer(&buffer_ptr, &buffer_size); + return 0; // need more data + +} // end zmq::norm_engine_t::NormRxStreamState::Decode() + +zmq::norm_engine_t::NormRxStreamState::List::List() + : head(NULL), tail(NULL) +{ +} + +zmq::norm_engine_t::NormRxStreamState::List::~List() +{ + Destroy(); +} + +void zmq::norm_engine_t::NormRxStreamState::List::Destroy() +{ + NormRxStreamState* item = head; + while (NULL != item) + { + Remove(*item); + delete item; + item = head; + } +} // end zmq::norm_engine_t::NormRxStreamState::List::Destroy() + +void zmq::norm_engine_t::NormRxStreamState::List::Append(NormRxStreamState& item) +{ + item.prev = tail; + if (NULL != tail) + tail->next = &item; + else + head = &item; + item.next = NULL; + tail = &item; + item.list = this; +} // end zmq::norm_engine_t::NormRxStreamState::List::Append() + +void zmq::norm_engine_t::NormRxStreamState::List::Remove(NormRxStreamState& item) +{ + if (NULL != item.prev) + item.prev->next = item.next; + else + head = item.next; + if (NULL != item.next) + item.next ->prev = item.prev; + else + tail = item.prev; + item.prev = item.next = NULL; + item.list = NULL; +} // end zmq::norm_engine_t::NormRxStreamState::List::Remove() + +zmq::norm_engine_t::NormRxStreamState::List::Iterator::Iterator(const List& list) + : next_item(list.head) +{ +} + +zmq::norm_engine_t::NormRxStreamState* zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() +{ + NormRxStreamState* nextItem = next_item; + if (NULL != nextItem) next_item = nextItem->next; + return nextItem; +} // end zmq::norm_engine_t::NormRxStreamState::List::Iterator::GetNextItem() + + +#endif // ZMQ_HAVE_NORM diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp new file mode 100644 index 00000000..29fbe7fd --- /dev/null +++ b/src/norm_engine.hpp @@ -0,0 +1,189 @@ + +#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__ +#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__ + +#define ZMQ_HAVE_NORM 1 + +#if defined ZMQ_HAVE_NORM + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "options.hpp" +#include "v2_decoder.hpp" +#include "v2_encoder.hpp" + +#include + +namespace zmq +{ + class io_thread_t; + class session_base_t; + + class norm_engine_t : public io_object_t, public i_engine + { + public: + norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_); + ~norm_engine_t (); + + // create NORM instance, session, etc + int init(const char* network_, bool send, bool recv); + void shutdown(); + + // i_engine interface implementation. + // Plug the engine to the session. + virtual void plug (zmq::io_thread_t *io_thread_, + class session_base_t *session_); + + // Terminate and deallocate the engine. Note that 'detached' + // events are not fired on termination. + virtual void terminate (); + + // This method is called by the session to signalise that more + // messages can be written to the pipe. + virtual void restart_input (); + + // This method is called by the session to signalise that there + // are messages to send available. + virtual void restart_output (); + + virtual void zap_msg_available () {}; + + // i_poll_events interface implementation. + // (we only need in_event() for NormEvent notification) + // (i.e., don't have any output events or timers (yet)) + void in_event (); + + private: + void unplug(); + void send_data(); + void recv_data(NormObjectHandle stream); + + + enum {BUFFER_SIZE = 2048}; + + // Used to keep track of streams from multiple senders + class NormRxStreamState + { + public: + NormRxStreamState(NormObjectHandle normStream, + int64_t maxMsgSize); + ~NormRxStreamState(); + + NormObjectHandle GetStreamHandle() const + {return norm_stream;} + + bool Init(); + + void SetRxReady(bool state) + {rx_ready = state;} + bool IsRxReady() const + {return rx_ready;} + + void SetSync(bool state) + {in_sync = state;} + bool InSync() const + {return in_sync;} + + // These are used to feed data to decoder + // and its underlying "msg" buffer + char* AccessBuffer() + {return (char*)(buffer_ptr + buffer_count);} + size_t GetBytesNeeded() const + {return (buffer_size - buffer_count);} + void IncrementBufferCount(size_t count) + {buffer_count += count;} + msg_t* AccessMsg() + {return zmq_decoder->msg();} + // This invokes the decoder "decode" method + // returning 0 if more data is needed, + // 1 if the message is complete, If an error + // occurs the 'sync' is dropped and the + // decoder re-initialized + int Decode(); + + class List + { + public: + List(); + ~List(); + + void Append(NormRxStreamState& item); + void Remove(NormRxStreamState& item); + + bool IsEmpty() const + {return (NULL == head);} + + void Destroy(); + + class Iterator + { + public: + Iterator(const List& list); + NormRxStreamState* GetNextItem(); + private: + NormRxStreamState* next_item; + }; + friend class Iterator; + + private: + NormRxStreamState* head; + NormRxStreamState* tail; + + }; // end class zmq::norm_engine_t::NormRxStreamState::List + + friend class List; + + List* AccessList() + {return list;} + + + private: + NormObjectHandle norm_stream; + int64_t max_msg_size; + bool in_sync; + bool rx_ready; + v2_decoder_t* zmq_decoder; + bool skip_norm_sync; + unsigned char* buffer_ptr; + size_t buffer_size; + size_t buffer_count; + + NormRxStreamState* prev; + NormRxStreamState* next; + NormRxStreamState::List* list; + + }; // end class zmq::norm_engine_t::NormRxStreamState + + session_base_t* zmq_session; + options_t options; + NormInstanceHandle norm_instance; + handle_t norm_descriptor_handle; + NormSessionHandle norm_session; + bool is_sender; + bool is_receiver; + // Sender state + msg_t tx_msg; + v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) + bool tx_more_bit; + bool zmq_output_ready; // zmq has msg(s) to send + NormObjectHandle norm_tx_stream; + bool norm_tx_ready; // norm has tx queue vacancy + // tbd - maybe don't need buffer if can access encoder buffer directly? + char tx_buffer[BUFFER_SIZE]; + unsigned int tx_index; + unsigned int tx_len; + + // Receiver state + // Lists of norm rx streams from remote senders + bool zmq_input_ready; // zmq ready to receive msg(s) + NormRxStreamState::List rx_pending_list; // rx streams waiting for data reception + NormRxStreamState::List rx_ready_list; // rx streams ready for NormStreamRead() + NormRxStreamState::List msg_ready_list; // rx streams w/ msg ready for push to zmq + + + }; // end class norm_engine_t +} + +#endif // ZMQ_HAVE_NORM + +#endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__ diff --git a/src/session_base.cpp b/src/session_base.cpp index 442e05de..7c7650ae 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -28,6 +28,7 @@ #include "pgm_sender.hpp" #include "pgm_receiver.hpp" #include "address.hpp" +#include "norm_engine.hpp" #include "ctx.hpp" #include "req.hpp" @@ -449,8 +450,9 @@ void zmq::session_base_t::reconnect () { // For delayed connect situations, terminate the pipe // and reestablish later on - if (pipe && options.immediate == 1 - && addr->protocol != "pgm" && addr->protocol != "epgm") { + if (pipe && 1 == options.immediate == 1 + && addr->protocol != "pgm" && addr->protocol != "epgm" + && addr->protocol != "norm") { pipe->hiccup (); pipe->terminate (false); terminating_pipes.insert (pipe); @@ -549,6 +551,38 @@ void zmq::session_base_t::start_connecting (bool wait_) return; } #endif + +#ifdef ZMQ_HAVE_NORM + if (addr->protocol == "norm") + { + // At this point we'll create message pipes to the session straight + // away. There's no point in delaying it as no concept of 'connect' + // exists with NORM anyway. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) { + + // NORM sender. + norm_engine_t* norm_sender = new (std::nothrow) norm_engine_t(io_thread, options); + alloc_assert (norm_sender); + + int rc = norm_sender->init (addr->address.c_str (), true, false); + errno_assert (rc == 0); + + send_attach (this, norm_sender); + } + else { // ZMQ_SUB or ZMQ_XSUB + + // NORM receiver. + norm_engine_t* norm_receiver = new (std::nothrow) norm_engine_t (io_thread, options); + alloc_assert (norm_receiver); + + int rc = norm_receiver->init (addr->address.c_str (), false, true); + errno_assert (rc == 0); + + send_attach (this, norm_receiver); + } + return; + } +#endif // ZMQ_HAVE_NORM zmq_assert (false); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 630e7981..d467ceb8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -190,11 +190,11 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) { // First check out whether the protcol is something we are aware of. if (protocol_ != "inproc" && protocol_ != "ipc" && protocol_ != "tcp" && - protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc") { + protocol_ != "pgm" && protocol_ != "epgm" && protocol_ != "tipc" && + protocol_ != "norm") { errno = EPROTONOSUPPORT; return -1; } - // If 0MQ is not compiled with OpenPGM, pgm and epgm transports // are not avaialble. #if !defined ZMQ_HAVE_OPENPGM @@ -203,6 +203,13 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return -1; } #endif + +#if !defined ZMQ_HAVE_NORM + if (protocol_ == "norm") { + errno = EPROTONOSUPPORT; + return -1; + } +#endif // !ZMQ_HAVE_NORM // IPC transport is not available on Windows and OpenVMS. #if defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS @@ -224,7 +231,7 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) // Check whether socket type and transport protocol match. // Specifically, multicast protocols can't be combined with // bi-directional messaging patterns (socket types). - if ((protocol_ == "pgm" || protocol_ == "epgm") && + if ((protocol_ == "pgm" || protocol_ == "epgm" || protocol_ == "norm") && options.type != ZMQ_PUB && options.type != ZMQ_SUB && options.type != ZMQ_XPUB && options.type != ZMQ_XSUB) { errno = ENOCOMPATPROTO; @@ -362,9 +369,9 @@ int zmq::socket_base_t::bind (const char *addr_) return rc; } - if (protocol == "pgm" || protocol == "epgm") { + if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { // For convenience's sake, bind can be used interchageable with - // connect for PGM and EPGM transports. + // connect for PGM, EPGM and NORM transports. return connect (addr_); } @@ -600,6 +607,9 @@ int zmq::socket_base_t::connect (const char *addr_) } } #endif + +// TBD - Should we check address for ZMQ_HAVE_NORM??? + #ifdef ZMQ_HAVE_OPENPGM if (protocol == "pgm" || protocol == "epgm") { struct pgm_addrinfo_t *res = NULL; @@ -630,8 +640,8 @@ int zmq::socket_base_t::connect (const char *addr_) errno_assert (session); // PGM does not support subscription forwarding; ask for all data to be - // sent to this pipe. - bool subscribe_to_all = protocol == "pgm" || protocol == "epgm"; + // sent to this pipe. (same for NORM, currently?) + bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm"; pipe_t *newpipe = NULL; if (options.immediate != 1 || subscribe_to_all) {