mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-17 08:34:00 +00:00
Merge pull request #926 from bebopagogo/master
added uncommitted norm_engine changes
This commit is contained in:
commit
0e0c46aedc
@ -60,7 +60,7 @@ namespace zmq
|
|||||||
{
|
{
|
||||||
free (buf);
|
free (buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
// The function returns a batch of binary data. The data
|
// The function returns a batch of binary data. The data
|
||||||
// are filled to a supplied buffer. If no buffer is supplied (data_
|
// are filled to a supplied buffer. If no buffer is supplied (data_
|
||||||
// points to NULL) decoder object will provide buffer of its own.
|
// points to NULL) decoder object will provide buffer of its own.
|
||||||
|
@ -15,7 +15,7 @@ zmq::norm_engine_t::norm_engine_t(io_thread_t* parent_,
|
|||||||
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
|
norm_instance(NORM_INSTANCE_INVALID), norm_session(NORM_SESSION_INVALID),
|
||||||
is_sender(false), is_receiver(false),
|
is_sender(false), is_receiver(false),
|
||||||
zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false),
|
zmq_encoder(0), tx_more_bit(false), zmq_output_ready(false),
|
||||||
norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false),
|
norm_tx_stream(NORM_OBJECT_INVALID), norm_tx_ready(false),
|
||||||
tx_index(0), tx_len(0),
|
tx_index(0), tx_len(0),
|
||||||
zmq_input_ready(false)
|
zmq_input_ready(false)
|
||||||
{
|
{
|
||||||
@ -267,12 +267,14 @@ void zmq::norm_engine_t::send_data()
|
|||||||
// Buffer contained end of message (should we flush?)
|
// Buffer contained end of message (should we flush?)
|
||||||
//NormStreamMarkEom(norm_tx_stream);
|
//NormStreamMarkEom(norm_tx_stream);
|
||||||
// Note this makes NORM fairly chatty for low duty cycle messaging
|
// Note this makes NORM fairly chatty for low duty cycle messaging
|
||||||
|
// but makes sure content is delivered quickly. Positive acknowledgements
|
||||||
|
// with flush override would make NORM more succinct here
|
||||||
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
|
NormStreamFlush(norm_tx_stream, true, NORM_FLUSH_ACTIVE);
|
||||||
}
|
}
|
||||||
tx_index = tx_len = 0; // all buffered data was written
|
tx_index = tx_len = 0; // all buffered data was written
|
||||||
}
|
}
|
||||||
// Still norm_tx_ready, so ask for more data from zmq_session
|
// Still norm_tx_ready, so ask for more data from zmq_session
|
||||||
if (!zmq_encoder.has_data())
|
if (!zmq_encoder.has_data())
|
||||||
{
|
{
|
||||||
// Existing message had no more data to encode
|
// Existing message had no more data to encode
|
||||||
if (-1 == zmq_session->pull_msg(&tx_msg))
|
if (-1 == zmq_session->pull_msg(&tx_msg))
|
||||||
@ -333,20 +335,19 @@ void zmq::norm_engine_t::in_event()
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case NORM_RX_OBJECT_ABORTED:
|
case NORM_RX_OBJECT_ABORTED:
|
||||||
|
{
|
||||||
|
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
|
||||||
|
if (NULL != rxState)
|
||||||
{
|
{
|
||||||
NormRxStreamState* rxState = (NormRxStreamState*)NormObjectGetUserData(event.object);
|
// Remove the state from the list it's in
|
||||||
if (NULL != rxState)
|
// This is now unnecessary since deletion takes care of list removal
|
||||||
{
|
// but in the interest of being clear ...
|
||||||
// Remove the state from the list it's in
|
NormRxStreamState::List* list = rxState->AccessList();
|
||||||
// This is now unnecessary since deletion takes care of list removal
|
if (NULL != list) list->Remove(*rxState);
|
||||||
// but in the interest of being clear ...
|
|
||||||
NormRxStreamState::List* list = rxState->AccessList();
|
|
||||||
if (NULL != list) list->Remove(*rxState);
|
|
||||||
}
|
|
||||||
delete rxState;
|
|
||||||
}
|
}
|
||||||
|
delete rxState;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case NORM_REMOTE_SENDER_INACTIVE:
|
case NORM_REMOTE_SENDER_INACTIVE:
|
||||||
// Here we free resources used for this formerly active sender.
|
// Here we free resources used for this formerly active sender.
|
||||||
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
|
// Note w/ NORM_SYNC_STREAM, if sender reactivates, we may
|
||||||
|
@ -2,8 +2,6 @@
|
|||||||
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
|
#ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
|
||||||
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
|
#define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
|
||||||
|
|
||||||
#define ZMQ_HAVE_NORM 1
|
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_NORM
|
#if defined ZMQ_HAVE_NORM
|
||||||
|
|
||||||
#include "io_object.hpp"
|
#include "io_object.hpp"
|
||||||
@ -12,7 +10,7 @@
|
|||||||
#include "v2_decoder.hpp"
|
#include "v2_decoder.hpp"
|
||||||
#include "v2_encoder.hpp"
|
#include "v2_encoder.hpp"
|
||||||
|
|
||||||
#include <norm/include/normApi.h>
|
#include <normApi.h>
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user