From 0a97e054397182b5d66919870b7b5374676098b7 Mon Sep 17 00:00:00 2001 From: bebopagogo Date: Wed, 19 Mar 2014 15:00:03 -0400 Subject: [PATCH 1/2] removed dependency on deprecated encoder_base::has_data() method --- src/norm_engine.cpp | 118 ++++++++++++++++++++++++++------------------ src/norm_engine.hpp | 5 +- 2 files changed, 74 insertions(+), 49 deletions(-) diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index 630e7b0a..390a7bf6 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -12,8 +12,9 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_, : io_object_t(parent_), zmq_session(NULL), options(options_), norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID), is_sender(false), is_receiver(false), - zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false), - norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false), + zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID), + tx_first_msg(true), tx_more_bit(false), + zmq_output_ready(false), norm_tx_ready(false), tx_index(0), tx_len(0), zmq_input_ready(false) { @@ -94,8 +95,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) // TBD - What do we use for our local NormNodeId? // (for now we use automatic, IP addr based assignment or passed in 'id') - // a) Add function to use iface addr - // b) Randomize and implement a NORM session layer + // a) Use ZMQ Identity somehow? + // b) Add function to use iface addr + // c) Randomize and implement a NORM session layer // conflict detection/resolution protocol norm_session = NormCreateSession(norm_instance, addr, portNumber, localId); @@ -107,6 +109,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) errno = savedErrno; return -1; } + // There's many other useful NORM options that could be applied here if (NormIsUnicastAddress(addr)) { NormSetDefaultUnicastNack(norm_session, true); @@ -128,6 +131,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) if (recv) { + // The alternative NORM_SYNC_CURRENT here would provide "instant" + // receiver sync to the sender's _current_ message transmission. + // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM); if (!NormStartReceiver(norm_session, 2*1024*1024)) { @@ -159,7 +165,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) } NormSetCongestionControl(norm_session, true); norm_tx_ready = true; - is_sender = true; + is_sender = true; if (NORM_OBJECT_INVALID == (norm_tx_stream = NormStreamOpen(norm_session, 2*1024*1024))) { // errno set by whatever failed @@ -172,6 +178,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) } } + //NormSetMessageTrace(norm_session, true); + //NormSetDebugLevel(3); + //NormOpenDebugLog(norm_instance, "normLog.txt"); + return 0; // no error } // end zmq::norm_engine_t::init() @@ -250,9 +260,62 @@ void zmq::norm_engine_t::send_data() // Here we write as much as is available or we can while (zmq_output_ready && norm_tx_ready) { + if (0 == tx_len) + { + // Our tx_buffer needs data to send + // Get more data from encoder + size_t space = BUFFER_SIZE; + unsigned char* bufPtr = (unsigned char*)tx_buffer; + tx_len = zmq_encoder.encode(&bufPtr, space); + if (0 == tx_len) + { + if (tx_first_msg) + { + // We don't need to mark eom/flush until a message is sent + tx_first_msg = false; + } + else + { + // A prior message was completely written to stream, so + // mark end-of-message and possibly flush (to force packet transmission, + // even if it's not a full segment so message gets delivered quickly) + // NormStreamMarkEom(norm_tx_stream); // the flush below marks eom + // Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging + // but makes sure content is delivered quickly. Positive acknowledgements + // with flush override would make NORM more succinct here + NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE); + } + // Need to pull and load a new message to send + if (-1 == zmq_session->pull_msg(&tx_msg)) + { + // We need to wait for "restart_output()" to be called by ZMQ + zmq_output_ready = false; + break; + } + zmq_encoder.load_msg(&tx_msg); + // Should we write message size header for NORM to use? Or expect NORM + // receiver to decode ZMQ message framing format(s)? + // OK - we need to use a byte to denote when the ZMQ frame is the _first_ + // frame of a message so it can be decoded properly when a receiver + // 'syncs' mid-stream. We key off the the state of the 'more_flag' + // I.e.,If more_flag _was_ false previously, this is the first + // frame of a ZMQ message. + if (tx_more_bit) + tx_buffer[0] = (char)0xff; // this is not first frame of message + else + tx_buffer[0] = 0x00; // this is first frame of message + tx_more_bit = (0 != (tx_msg.flags() & msg_t::more)); + // Go ahead an get a first chunk of the message + bufPtr++; + space--; + tx_len = 1 + zmq_encoder.encode(&bufPtr, space); + tx_index = 0; + } + } // Do we have data in our tx_buffer pending if (tx_index < tx_len) { + // We have data in our tx_buffer to send, so write it to the stream tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index); if (tx_index < tx_len) { @@ -260,48 +323,9 @@ void zmq::norm_engine_t::send_data() norm_tx_ready = false; break; } - else if (!zmq_encoder.has_data()) - { - // Buffer contained end of message (should we flush?) - //NormStreamMarkEom(norm_tx_stream); - // Note this makes NORM fairly chatty for low duty cycle messaging - // but makes sure content is delivered quickly. Positive acknowledgements - // with flush override would make NORM more succinct here - NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE); - } - tx_index = tx_len = 0; // all buffered data was written + tx_len = 0; // all buffered data was written } - // Still norm_tx_ready, so ask for more data from zmq_session - if (!zmq_encoder.has_data()) - { - // Existing message had no more data to encode - if (-1 == zmq_session->pull_msg(&tx_msg)) - { - // We need to wait for "restart_output()" to be called by ZMQ - zmq_output_ready = false; - break; - } - zmq_encoder.load_msg(&tx_msg); - // Should we write message size header for NORM to use? Or expect NORM - // receiver to decode ZMQ message framing format(s)? - // OK - we need to use a byte to denote when the ZMQ frame is the _first_ - // frame of a message so it can be decoded properly when a receiver - // 'syncs' mid-stream. We key off the the state of the 'more_flag' - // I.e.,If more_flag _was_ false previously, this is the first - // frame of a ZMQ message. - if (tx_more_bit) - tx_buffer[0] = (char)0xff; // this is not first frame of message - else - tx_buffer[0] = 0x00; // this is first frame of message - tx_more_bit = (0 != (tx_msg.flags() & v2_protocol_t::more_flag)); - tx_len = 1; - } - // Get more data from encoder - size_t space = BUFFER_SIZE - tx_index; - unsigned char* bufPtr = (unsigned char*)(tx_buffer + tx_len); - size_t bytes = zmq_encoder.encode(&bufPtr, space); - tx_len += bytes; - } + } // end while (zmq_output_ready && norm_tx_ready) } // end zmq::norm_engine_t::send_data() void zmq::norm_engine_t::in_event() @@ -327,7 +351,7 @@ void zmq::norm_engine_t::in_event() break; case NORM_RX_OBJECT_NEW: - break; + //break; case NORM_RX_OBJECT_UPDATED: recv_data(event.object); break; diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index 4432ce57..72542e19 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -162,11 +162,12 @@ namespace zmq // Sender state msg_t tx_msg; v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now) + NormObjectHandle norm_tx_stream; + bool tx_first_msg; bool tx_more_bit; bool zmq_output_ready; // zmq has msg(s) to send - NormObjectHandle norm_tx_stream; bool norm_tx_ready; // norm has tx queue vacancy - // tbd - maybe don't need buffer if can access encoder buffer directly? + // tbd - maybe don't need buffer if can access zmq message buffer directly? char tx_buffer[BUFFER_SIZE]; unsigned int tx_index; unsigned int tx_len; From cd9755e4aa4e658d533b7326fdf3d8f1671f09d0 Mon Sep 17 00:00:00 2001 From: bebopagogo Date: Wed, 19 Mar 2014 15:02:06 -0400 Subject: [PATCH 2/2] removed norm_engine dependency on deprecated encoder_base::has_data() method --- src/norm_engine.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index 390a7bf6..23eac066 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -33,6 +33,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv) // Parse the "network_" address int "iface", "addr", and "port" // norm endpoint format: [id,][;]: // First, look for optional local NormNodeId + // (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId) NormNodeId localId = NORM_NODE_ANY; const char* ifacePtr = strchr(network_, ','); if (NULL != ifacePtr)