mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-17 00:23:58 +00:00
Merge pull request #938 from bebopagogo/master
norm_engine update with no encoder_base_t::has_data() dependency
This commit is contained in:
commit
faacb1413f
@ -12,8 +12,9 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
|
|||||||
: io_object_t(parent_), zmq_session(NULL), options(options_),
|
: io_object_t(parent_), zmq_session(NULL), options(options_),
|
||||||
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
|
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
|
||||||
is_sender(false), is_receiver(false),
|
is_sender(false), is_receiver(false),
|
||||||
zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false),
|
zmq_encoder(0), norm_tx_stream(NORM_OBJECT_INVALID),
|
||||||
norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false),
|
tx_first_msg(true), tx_more_bit(false),
|
||||||
|
zmq_output_ready(false), norm_tx_ready(false),
|
||||||
tx_index(0), tx_len(0),
|
tx_index(0), tx_len(0),
|
||||||
zmq_input_ready(false)
|
zmq_input_ready(false)
|
||||||
{
|
{
|
||||||
@ -32,6 +33,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
// Parse the "network_" address int "iface", "addr", and "port"
|
// Parse the "network_" address int "iface", "addr", and "port"
|
||||||
// norm endpoint format: [id,][<iface>;]<addr>:<port>
|
// norm endpoint format: [id,][<iface>;]<addr>:<port>
|
||||||
// First, look for optional local NormNodeId
|
// First, look for optional local NormNodeId
|
||||||
|
// (default NORM_NODE_ANY causes NORM to use host IP addr for NormNodeId)
|
||||||
NormNodeId localId = NORM_NODE_ANY;
|
NormNodeId localId = NORM_NODE_ANY;
|
||||||
const char* ifacePtr = strchr(network_, ',');
|
const char* ifacePtr = strchr(network_, ',');
|
||||||
if (NULL != ifacePtr)
|
if (NULL != ifacePtr)
|
||||||
@ -94,8 +96,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
|
|
||||||
// TBD - What do we use for our local NormNodeId?
|
// TBD - What do we use for our local NormNodeId?
|
||||||
// (for now we use automatic, IP addr based assignment or passed in 'id')
|
// (for now we use automatic, IP addr based assignment or passed in 'id')
|
||||||
// a) Add function to use iface addr
|
// a) Use ZMQ Identity somehow?
|
||||||
// b) Randomize and implement a NORM session layer
|
// b) Add function to use iface addr
|
||||||
|
// c) Randomize and implement a NORM session layer
|
||||||
// conflict detection/resolution protocol
|
// conflict detection/resolution protocol
|
||||||
|
|
||||||
norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
|
norm_session = NormCreateSession(norm_instance, addr, portNumber, localId);
|
||||||
@ -107,6 +110,7 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
errno = savedErrno;
|
errno = savedErrno;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
// There's many other useful NORM options that could be applied here
|
||||||
if (NormIsUnicastAddress(addr))
|
if (NormIsUnicastAddress(addr))
|
||||||
{
|
{
|
||||||
NormSetDefaultUnicastNack(norm_session, true);
|
NormSetDefaultUnicastNack(norm_session, true);
|
||||||
@ -128,6 +132,9 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
|
|
||||||
if (recv)
|
if (recv)
|
||||||
{
|
{
|
||||||
|
// The alternative NORM_SYNC_CURRENT here would provide "instant"
|
||||||
|
// receiver sync to the sender's _current_ message transmission.
|
||||||
|
// NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
|
||||||
NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
|
NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
|
||||||
if (!NormStartReceiver(norm_session, 2*1024*1024))
|
if (!NormStartReceiver(norm_session, 2*1024*1024))
|
||||||
{
|
{
|
||||||
@ -172,6 +179,10 @@ int zmq::norm_engine_t::init(const char* network_, bool send, bool recv)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//NormSetMessageTrace(norm_session, true);
|
||||||
|
//NormSetDebugLevel(3);
|
||||||
|
//NormOpenDebugLog(norm_instance, "normLog.txt");
|
||||||
|
|
||||||
return 0; // no error
|
return 0; // no error
|
||||||
} // end zmq::norm_engine_t::init()
|
} // end zmq::norm_engine_t::init()
|
||||||
|
|
||||||
@ -250,31 +261,32 @@ void zmq::norm_engine_t::send_data()
|
|||||||
// Here we write as much as is available or we can
|
// Here we write as much as is available or we can
|
||||||
while (zmq_output_ready && norm_tx_ready)
|
while (zmq_output_ready && norm_tx_ready)
|
||||||
{
|
{
|
||||||
// Do we have data in our tx_buffer pending
|
if (0 == tx_len)
|
||||||
if (tx_index < tx_len)
|
|
||||||
{
|
{
|
||||||
tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index);
|
// Our tx_buffer needs data to send
|
||||||
if (tx_index < tx_len)
|
// Get more data from encoder
|
||||||
|
size_t space = BUFFER_SIZE;
|
||||||
|
unsigned char* bufPtr = (unsigned char*)tx_buffer;
|
||||||
|
tx_len = zmq_encoder.encode(&bufPtr, space);
|
||||||
|
if (0 == tx_len)
|
||||||
{
|
{
|
||||||
// NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
|
if (tx_first_msg)
|
||||||
norm_tx_ready = false;
|
{
|
||||||
break;
|
// We don't need to mark eom/flush until a message is sent
|
||||||
|
tx_first_msg = false;
|
||||||
}
|
}
|
||||||
else if (!zmq_encoder.has_data())
|
else
|
||||||
{
|
{
|
||||||
// Buffer contained end of message (should we flush?)
|
// A prior message was completely written to stream, so
|
||||||
//NormStreamMarkEom(norm_tx_stream);
|
// mark end-of-message and possibly flush (to force packet transmission,
|
||||||
// Note this makes NORM fairly chatty for low duty cycle messaging
|
// even if it's not a full segment so message gets delivered quickly)
|
||||||
|
// NormStreamMarkEom(norm_tx_stream); // the flush below marks eom
|
||||||
|
// Note NORM_FLUSH_ACTIVE makes NORM fairly chatty for low duty cycle messaging
|
||||||
// but makes sure content is delivered quickly. Positive acknowledgements
|
// but makes sure content is delivered quickly. Positive acknowledgements
|
||||||
// with flush override would make NORM more succinct here
|
// with flush override would make NORM more succinct here
|
||||||
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
|
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
|
||||||
}
|
}
|
||||||
tx_index = tx_len = 0; // all buffered data was written
|
// Need to pull and load a new message to send
|
||||||
}
|
|
||||||
// Still norm_tx_ready, so ask for more data from zmq_session
|
|
||||||
if (!zmq_encoder.has_data())
|
|
||||||
{
|
|
||||||
// Existing message had no more data to encode
|
|
||||||
if (-1 == zmq_session->pull_msg(&tx_msg))
|
if (-1 == zmq_session->pull_msg(&tx_msg))
|
||||||
{
|
{
|
||||||
// We need to wait for "restart_output()" to be called by ZMQ
|
// We need to wait for "restart_output()" to be called by ZMQ
|
||||||
@ -293,15 +305,28 @@ void zmq::norm_engine_t::send_data()
|
|||||||
tx_buffer[0] = (char)0xff; // this is not first frame of message
|
tx_buffer[0] = (char)0xff; // this is not first frame of message
|
||||||
else
|
else
|
||||||
tx_buffer[0] = 0x00; // this is first frame of message
|
tx_buffer[0] = 0x00; // this is first frame of message
|
||||||
tx_more_bit = (0 != (tx_msg.flags() & v2_protocol_t::more_flag));
|
tx_more_bit = (0 != (tx_msg.flags() & msg_t::more));
|
||||||
tx_len = 1;
|
// Go ahead an get a first chunk of the message
|
||||||
|
bufPtr++;
|
||||||
|
space--;
|
||||||
|
tx_len = 1 + zmq_encoder.encode(&bufPtr, space);
|
||||||
|
tx_index = 0;
|
||||||
}
|
}
|
||||||
// Get more data from encoder
|
|
||||||
size_t space = BUFFER_SIZE - tx_index;
|
|
||||||
unsigned char* bufPtr = (unsigned char*)(tx_buffer + tx_len);
|
|
||||||
size_t bytes = zmq_encoder.encode(&bufPtr, space);
|
|
||||||
tx_len += bytes;
|
|
||||||
}
|
}
|
||||||
|
// Do we have data in our tx_buffer pending
|
||||||
|
if (tx_index < tx_len)
|
||||||
|
{
|
||||||
|
// We have data in our tx_buffer to send, so write it to the stream
|
||||||
|
tx_index += NormStreamWrite(norm_tx_stream, tx_buffer + tx_index, tx_len - tx_index);
|
||||||
|
if (tx_index < tx_len)
|
||||||
|
{
|
||||||
|
// NORM stream buffer full, wait for NORM_TX_QUEUE_VACANCY
|
||||||
|
norm_tx_ready = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
tx_len = 0; // all buffered data was written
|
||||||
|
}
|
||||||
|
} // end while (zmq_output_ready && norm_tx_ready)
|
||||||
} // end zmq::norm_engine_t::send_data()
|
} // end zmq::norm_engine_t::send_data()
|
||||||
|
|
||||||
void zmq::norm_engine_t::in_event()
|
void zmq::norm_engine_t::in_event()
|
||||||
@ -327,7 +352,7 @@ void zmq::norm_engine_t::in_event()
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case NORM_RX_OBJECT_NEW:
|
case NORM_RX_OBJECT_NEW:
|
||||||
break;
|
//break;
|
||||||
case NORM_RX_OBJECT_UPDATED:
|
case NORM_RX_OBJECT_UPDATED:
|
||||||
recv_data(event.object);
|
recv_data(event.object);
|
||||||
break;
|
break;
|
||||||
|
@ -162,11 +162,12 @@ namespace zmq
|
|||||||
// Sender state
|
// Sender state
|
||||||
msg_t tx_msg;
|
msg_t tx_msg;
|
||||||
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
|
v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
|
||||||
|
NormObjectHandle norm_tx_stream;
|
||||||
|
bool tx_first_msg;
|
||||||
bool tx_more_bit;
|
bool tx_more_bit;
|
||||||
bool zmq_output_ready; // zmq has msg(s) to send
|
bool zmq_output_ready; // zmq has msg(s) to send
|
||||||
NormObjectHandle norm_tx_stream;
|
|
||||||
bool norm_tx_ready; // norm has tx queue vacancy
|
bool norm_tx_ready; // norm has tx queue vacancy
|
||||||
// tbd - maybe don't need buffer if can access encoder buffer directly?
|
// tbd - maybe don't need buffer if can access zmq message buffer directly?
|
||||||
char tx_buffer[BUFFER_SIZE];
|
char tx_buffer[BUFFER_SIZE];
|
||||||
unsigned int tx_index;
|
unsigned int tx_index;
|
||||||
unsigned int tx_len;
|
unsigned int tx_len;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user