mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Merge pull request #626 from dkrikun/master
Add ZMQ_CONFLATE socket option
This commit is contained in:
commit
d485404aab
@ -682,6 +682,22 @@ Default value:: NULL
|
||||
Applicable socket types:: all, when using TCP transport
|
||||
|
||||
|
||||
ZMQ_CONFLATE: Keep only last message
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
If set, a socket shall keep only one message in its inbound/outbound
|
||||
queue, this message being the last message received/the last message
|
||||
to be sent.
|
||||
Ignores 'ZMQ_RECVHWM' and 'ZMQ_SENDHWM' options.
|
||||
Does not supports multi-part messages, in particular, only one part of it
|
||||
is kept in the socket internal queue.
|
||||
[horizontal]
|
||||
Option value type:: int
|
||||
Option value unit:: boolean
|
||||
Default value:: 0 (false)
|
||||
Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
|
||||
|
@ -278,6 +278,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
|
||||
#define ZMQ_PROBE_ROUTER 51
|
||||
#define ZMQ_REQ_REQUEST_IDS 52
|
||||
#define ZMQ_REQ_STRICT 53
|
||||
#define ZMQ_CONFLATE 54
|
||||
|
||||
/* Message options */
|
||||
#define ZMQ_MORE 1
|
||||
|
@ -88,6 +88,7 @@ libzmq_la_SOURCES = \
|
||||
dealer.hpp \
|
||||
xsub.hpp \
|
||||
ypipe.hpp \
|
||||
ypipe_flat.hpp \
|
||||
yqueue.hpp \
|
||||
z85_codec.hpp \
|
||||
address.cpp \
|
||||
@ -163,7 +164,9 @@ libzmq_la_SOURCES = \
|
||||
raw_decoder.hpp \
|
||||
raw_decoder.cpp \
|
||||
raw_encoder.hpp \
|
||||
raw_encoder.cpp
|
||||
raw_encoder.cpp \
|
||||
ypipe_conflate.hpp \
|
||||
dbuffer.hpp
|
||||
|
||||
if ON_MINGW
|
||||
libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@
|
||||
|
@ -21,6 +21,7 @@
|
||||
#define __ZMQ_BLOB_HPP_INCLUDED__
|
||||
|
||||
#include <string>
|
||||
#include <string.h>
|
||||
|
||||
// Borrowed from id3lib_strings.h:
|
||||
// They seem to be doing something for MSC, but since I only have gcc, I'll just do that
|
||||
|
134
src/dbuffer.hpp
Normal file
134
src/dbuffer.hpp
Normal file
@ -0,0 +1,134 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_DBUFFER_HPP_INCLUDED__
|
||||
#define __ZMQ_DBUFFER_HPP_INCLUDED__
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <stddef.h>
|
||||
#include <algorithm>
|
||||
|
||||
#include "mutex.hpp"
|
||||
#include "msg.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
|
||||
// dbuffer is a single-producer single-consumer double-buffer
|
||||
// implementation.
|
||||
//
|
||||
// The producer writes to a back buffer and then tries to swap
|
||||
// pointers between the back and front buffers. If it fails,
|
||||
// due to the consumer reading from the front buffer, it just
|
||||
// gives up, which is ok since writes are many and redundant.
|
||||
//
|
||||
// The reader simply reads from the front buffer.
|
||||
//
|
||||
// has_msg keeps track of whether there has been a not yet read
|
||||
// value written, it is used by ypipe_conflate to mimic ypipe
|
||||
// functionality regarding a reader being asleep
|
||||
|
||||
template <typename T> class dbuffer_t;
|
||||
|
||||
template <> class dbuffer_t<msg_t>
|
||||
{
|
||||
public:
|
||||
|
||||
inline dbuffer_t ()
|
||||
: back (&storage[0])
|
||||
, front (&storage[1])
|
||||
, has_msg (false)
|
||||
{
|
||||
back->init ();
|
||||
front->init ();
|
||||
}
|
||||
|
||||
inline ~dbuffer_t()
|
||||
{
|
||||
back->close ();
|
||||
front->close ();
|
||||
}
|
||||
|
||||
inline void write (const msg_t &value_)
|
||||
{
|
||||
msg_t& xvalue = const_cast<msg_t&>(value_);
|
||||
|
||||
zmq_assert (xvalue.check ());
|
||||
back->move (xvalue); // cannot just overwrite, might leak
|
||||
|
||||
zmq_assert (back->check ());
|
||||
|
||||
if (sync.try_lock ())
|
||||
{
|
||||
std::swap (back, front);
|
||||
has_msg = true;
|
||||
|
||||
sync.unlock ();
|
||||
}
|
||||
}
|
||||
|
||||
inline bool read (msg_t *value_)
|
||||
{
|
||||
if (!value_)
|
||||
return false;
|
||||
|
||||
{
|
||||
scoped_lock_t lock (sync);
|
||||
if (!has_msg)
|
||||
return false;
|
||||
|
||||
zmq_assert (front->check ());
|
||||
|
||||
*value_ = *front;
|
||||
front->init (); // avoid double free
|
||||
|
||||
has_msg = false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
inline bool check_read ()
|
||||
{
|
||||
scoped_lock_t lock (sync);
|
||||
|
||||
return has_msg;
|
||||
}
|
||||
|
||||
inline bool probe (bool (*fn)(msg_t &))
|
||||
{
|
||||
scoped_lock_t lock (sync);
|
||||
return (*fn) (*front);
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
msg_t storage[2];
|
||||
msg_t *back, *front;
|
||||
|
||||
mutex_t sync;
|
||||
bool has_msg;
|
||||
|
||||
// Disable copying of dbuffer.
|
||||
dbuffer_t (const dbuffer_t&);
|
||||
const dbuffer_t &operator = (const dbuffer_t&);
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
@ -50,6 +50,11 @@ namespace zmq
|
||||
EnterCriticalSection (&cs);
|
||||
}
|
||||
|
||||
inline bool try_lock ()
|
||||
{
|
||||
return (bool) TryEnterCriticalSection (&cs);
|
||||
}
|
||||
|
||||
inline void unlock ()
|
||||
{
|
||||
LeaveCriticalSection (&cs);
|
||||
@ -94,6 +99,16 @@ namespace zmq
|
||||
posix_assert (rc);
|
||||
}
|
||||
|
||||
inline bool try_lock ()
|
||||
{
|
||||
int rc = pthread_mutex_trylock (&mutex);
|
||||
if (rc == EBUSY)
|
||||
return false;
|
||||
|
||||
posix_assert (rc);
|
||||
return true;
|
||||
}
|
||||
|
||||
inline void unlock ()
|
||||
{
|
||||
int rc = pthread_mutex_unlock (&mutex);
|
||||
@ -113,4 +128,30 @@ namespace zmq
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
struct scoped_lock_t
|
||||
{
|
||||
scoped_lock_t (mutex_t& mutex_)
|
||||
: mutex (mutex_)
|
||||
{
|
||||
mutex.lock ();
|
||||
}
|
||||
|
||||
~scoped_lock_t ()
|
||||
{
|
||||
mutex.unlock ();
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
mutex_t& mutex;
|
||||
|
||||
// Disable copy construction and assignment.
|
||||
scoped_lock_t (const scoped_lock_t&);
|
||||
const scoped_lock_t &operator = (const scoped_lock_t&);
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -54,7 +54,8 @@ zmq::options_t::options_t () :
|
||||
tcp_keepalive_intvl (-1),
|
||||
mechanism (ZMQ_NULL),
|
||||
as_server (0),
|
||||
socket_id (0)
|
||||
socket_id (0),
|
||||
conflate (false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -340,6 +341,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
}
|
||||
break;
|
||||
# endif
|
||||
|
||||
case ZMQ_CONFLATE:
|
||||
if (is_int && (value == 0 || value == 1)) {
|
||||
conflate = (value != 0);
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
@ -596,6 +607,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
}
|
||||
break;
|
||||
# endif
|
||||
|
||||
case ZMQ_CONFLATE:
|
||||
if (is_int) {
|
||||
*value = conflate;
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
|
@ -142,6 +142,12 @@ namespace zmq
|
||||
|
||||
// ID of the socket.
|
||||
int socket_id;
|
||||
|
||||
// If true, socket conflates outgoing/incoming messages.
|
||||
// Applicable to dealer, push/pull, pub/sub socket types.
|
||||
// Cannot receive multi-part messages.
|
||||
// Ignores hwm
|
||||
bool conflate;
|
||||
};
|
||||
}
|
||||
|
||||
|
42
src/pipe.cpp
42
src/pipe.cpp
@ -23,22 +23,37 @@
|
||||
#include "pipe.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
#include "ypipe.hpp"
|
||||
#include "ypipe_conflate.hpp"
|
||||
|
||||
int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
||||
int hwms_ [2], bool delays_ [2])
|
||||
int hwms_ [2], bool delays_ [2], bool conflate_ [2])
|
||||
{
|
||||
// Creates two pipe objects. These objects are connected by two ypipes,
|
||||
// each to pass messages in one direction.
|
||||
|
||||
pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t ();
|
||||
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t;
|
||||
typedef ypipe_conflate_t <msg_t, message_pipe_granularity> upipe_conflate_t;
|
||||
|
||||
pipe_t::upipe_t *upipe1;
|
||||
if(conflate_ [0])
|
||||
upipe1 = new (std::nothrow) upipe_conflate_t ();
|
||||
else
|
||||
upipe1 = new (std::nothrow) upipe_normal_t ();
|
||||
alloc_assert (upipe1);
|
||||
pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t ();
|
||||
|
||||
pipe_t::upipe_t *upipe2;
|
||||
if(conflate_ [1])
|
||||
upipe2 = new (std::nothrow) upipe_conflate_t ();
|
||||
else
|
||||
upipe2 = new (std::nothrow) upipe_normal_t ();
|
||||
alloc_assert (upipe2);
|
||||
|
||||
pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2,
|
||||
hwms_ [1], hwms_ [0], delays_ [0]);
|
||||
hwms_ [1], hwms_ [0], delays_ [0], conflate_ [0]);
|
||||
alloc_assert (pipes_ [0]);
|
||||
pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1,
|
||||
hwms_ [0], hwms_ [1], delays_ [1]);
|
||||
hwms_ [0], hwms_ [1], delays_ [1], conflate_ [1]);
|
||||
alloc_assert (pipes_ [1]);
|
||||
|
||||
pipes_ [0]->set_peer (pipes_ [1]);
|
||||
@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2],
|
||||
}
|
||||
|
||||
zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||
int inhwm_, int outhwm_, bool delay_) :
|
||||
int inhwm_, int outhwm_, bool delay_, bool conflate_) :
|
||||
object_t (parent_),
|
||||
inpipe (inpipe_),
|
||||
outpipe (outpipe_),
|
||||
@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||
peer (NULL),
|
||||
sink (NULL),
|
||||
state (active),
|
||||
delay (delay_)
|
||||
delay (delay_),
|
||||
conflate (conflate_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -303,11 +319,15 @@ void zmq::pipe_t::process_pipe_term_ack ()
|
||||
// First, delete all the unread messages in the pipe. We have to do it by
|
||||
// hand because msg_t doesn't have automatic destructor. Then deallocate
|
||||
// the ypipe itself.
|
||||
|
||||
if (!conflate) {
|
||||
msg_t msg;
|
||||
while (inpipe->read (&msg)) {
|
||||
int rc = msg.close ();
|
||||
errno_assert (rc == 0);
|
||||
}
|
||||
}
|
||||
|
||||
delete inpipe;
|
||||
|
||||
// Deallocate the pipe object
|
||||
@ -439,7 +459,13 @@ void zmq::pipe_t::hiccup ()
|
||||
inpipe = NULL;
|
||||
|
||||
// Create new inpipe.
|
||||
inpipe = new (std::nothrow) pipe_t::upipe_t ();
|
||||
if (conflate)
|
||||
inpipe = new (std::nothrow)
|
||||
ypipe_t <msg_t, message_pipe_granularity> ();
|
||||
else
|
||||
inpipe = new (std::nothrow)
|
||||
ypipe_conflate_t <msg_t, message_pipe_granularity> ();
|
||||
|
||||
alloc_assert (inpipe);
|
||||
in_active = true;
|
||||
|
||||
|
15
src/pipe.hpp
15
src/pipe.hpp
@ -21,7 +21,7 @@
|
||||
#define __ZMQ_PIPE_HPP_INCLUDED__
|
||||
|
||||
#include "msg.hpp"
|
||||
#include "ypipe.hpp"
|
||||
#include "ypipe_base.hpp"
|
||||
#include "config.hpp"
|
||||
#include "object.hpp"
|
||||
#include "stdint.hpp"
|
||||
@ -40,8 +40,10 @@ namespace zmq
|
||||
// Delay specifies how the pipe behaves when the peer terminates. If true
|
||||
// pipe receives all the pending messages before terminating, otherwise it
|
||||
// terminates straight away.
|
||||
// If conflate is true, only the most recently arrived message could be
|
||||
// read (older messages are discarded)
|
||||
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
|
||||
int hwms_ [2], bool delays_ [2]);
|
||||
int hwms_ [2], bool delays_ [2], bool conflate_ [2]);
|
||||
|
||||
struct i_pipe_events
|
||||
{
|
||||
@ -65,7 +67,8 @@ namespace zmq
|
||||
{
|
||||
// This allows pipepair to create pipe objects.
|
||||
friend int pipepair (zmq::object_t *parents_ [2],
|
||||
zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
|
||||
zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2],
|
||||
bool conflate_ [2]);
|
||||
|
||||
public:
|
||||
|
||||
@ -110,7 +113,7 @@ namespace zmq
|
||||
private:
|
||||
|
||||
// Type of the underlying lock-free pipe.
|
||||
typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
|
||||
typedef ypipe_base_t <msg_t, message_pipe_granularity> upipe_t;
|
||||
|
||||
// Command handlers.
|
||||
void process_activate_read ();
|
||||
@ -125,7 +128,7 @@ namespace zmq
|
||||
// Constructor is private. Pipe can only be created using
|
||||
// pipepair function.
|
||||
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
|
||||
int inhwm_, int outhwm_, bool delay_);
|
||||
int inhwm_, int outhwm_, bool delay_, bool conflate_);
|
||||
|
||||
// Pipepair uses this function to let us know about
|
||||
// the peer pipe object.
|
||||
@ -196,6 +199,8 @@ namespace zmq
|
||||
// Computes appropriate low watermark from the given high watermark.
|
||||
static int compute_lwm (int hwm_);
|
||||
|
||||
bool conflate;
|
||||
|
||||
// Disable copying.
|
||||
pipe_t (const pipe_t&);
|
||||
const pipe_t &operator = (const pipe_t&);
|
||||
|
@ -300,7 +300,8 @@ int zmq::session_base_t::zap_connect ()
|
||||
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {0, 0};
|
||||
bool delays [2] = {false, false};
|
||||
int rc = pipepair (parents, new_pipes, hwms, delays);
|
||||
bool conflates [2] = {false, false};
|
||||
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach local end of the pipe to this socket object.
|
||||
@ -331,9 +332,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||
if (!pipe && !is_terminating ()) {
|
||||
object_t *parents [2] = {this, socket};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.rcvhwm, options.sndhwm};
|
||||
|
||||
bool conflate = options.conflate &&
|
||||
(options.type == ZMQ_DEALER ||
|
||||
options.type == ZMQ_PULL ||
|
||||
options.type == ZMQ_PUSH ||
|
||||
options.type == ZMQ_PUB ||
|
||||
options.type == ZMQ_SUB);
|
||||
|
||||
int hwms [2] = {conflate? -1 : options.rcvhwm,
|
||||
conflate? -1 : options.sndhwm};
|
||||
bool delays [2] = {options.delay_on_close, options.delay_on_disconnect};
|
||||
int rc = pipepair (parents, pipes, hwms, delays);
|
||||
bool conflates [2] = {conflate, conflate};
|
||||
int rc = pipepair (parents, pipes, hwms, delays, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Plug the local end of the pipe.
|
||||
|
@ -450,9 +450,18 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// Create a bi-directional pipe to connect the peers.
|
||||
object_t *parents [2] = {this, peer.socket};
|
||||
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {sndhwm, rcvhwm};
|
||||
|
||||
bool conflate = options.conflate &&
|
||||
(options.type == ZMQ_DEALER ||
|
||||
options.type == ZMQ_PULL ||
|
||||
options.type == ZMQ_PUSH ||
|
||||
options.type == ZMQ_PUB ||
|
||||
options.type == ZMQ_SUB);
|
||||
|
||||
int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm};
|
||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
||||
int rc = pipepair (parents, new_pipes, hwms, delays);
|
||||
bool conflates [2] = {conflate, conflate};
|
||||
int rc = pipepair (parents, new_pipes, hwms, delays, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach local end of the pipe to this socket object.
|
||||
@ -554,9 +563,19 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *new_pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.sndhwm, options.rcvhwm};
|
||||
|
||||
bool conflate = options.conflate &&
|
||||
(options.type == ZMQ_DEALER ||
|
||||
options.type == ZMQ_PULL ||
|
||||
options.type == ZMQ_PUSH ||
|
||||
options.type == ZMQ_PUB ||
|
||||
options.type == ZMQ_SUB);
|
||||
|
||||
int hwms [2] = {conflate? -1 : options.sndhwm,
|
||||
conflate? -1 : options.rcvhwm};
|
||||
bool delays [2] = {options.delay_on_disconnect, options.delay_on_close};
|
||||
rc = pipepair (parents, new_pipes, hwms, delays);
|
||||
bool conflates [2] = {conflate, conflate};
|
||||
rc = pipepair (parents, new_pipes, hwms, delays, conflates);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "atomic_ptr.hpp"
|
||||
#include "yqueue.hpp"
|
||||
#include "platform.hpp"
|
||||
#include "ypipe_base.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
@ -34,7 +35,7 @@ namespace zmq
|
||||
// N is granularity of the pipe, i.e. how many items are needed to
|
||||
// perform next memory allocation.
|
||||
|
||||
template <typename T, int N> class ypipe_t
|
||||
template <typename T, int N> class ypipe_t : public ypipe_base_t<T,N>
|
||||
{
|
||||
public:
|
||||
|
||||
|
44
src/ypipe_base.hpp
Normal file
44
src/ypipe_base.hpp
Normal file
@ -0,0 +1,44 @@
|
||||
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_YPIPE_BASE_HPP_INCLUDED__
|
||||
#define __ZMQ_YPIPE_BASE_HPP_INCLUDED__
|
||||
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
// ypipe_base abstracts ypipe and ypipe_conflate specific
|
||||
// classes, one is selected according to a the conflate
|
||||
// socket option
|
||||
|
||||
template <typename T, int N> class ypipe_base_t
|
||||
{
|
||||
public:
|
||||
virtual ~ypipe_base_t () {}
|
||||
virtual void write (const T &value_, bool incomplete_) = 0;
|
||||
virtual bool unwrite (T *value_) = 0;
|
||||
virtual bool flush () = 0;
|
||||
virtual bool check_read () = 0;
|
||||
virtual bool read (T *value_) = 0;
|
||||
virtual bool probe (bool (*fn)(T &)) = 0;
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
127
src/ypipe_conflate.hpp
Normal file
127
src/ypipe_conflate.hpp
Normal file
@ -0,0 +1,127 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
|
||||
#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
|
||||
|
||||
#include "platform.hpp"
|
||||
#include "dbuffer.hpp"
|
||||
#include "ypipe_base.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
|
||||
// Adapter for dbuffer, to plug it in instead of a queue for the sake
|
||||
// of implementing the conflate socket option, which, if set, makes
|
||||
// the receiving side to discard all incoming messages but the last one.
|
||||
//
|
||||
// reader_awake flag is needed here to mimic ypipe delicate behaviour
|
||||
// around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp)
|
||||
|
||||
template <typename T, int N> class ypipe_conflate_t : public ypipe_base_t<T,N>
|
||||
{
|
||||
public:
|
||||
|
||||
// Initialises the pipe.
|
||||
inline ypipe_conflate_t ()
|
||||
: reader_awake(false)
|
||||
{
|
||||
}
|
||||
|
||||
// The destructor doesn't have to be virtual. It is mad virtual
|
||||
// just to keep ICC and code checking tools from complaining.
|
||||
inline virtual ~ypipe_conflate_t ()
|
||||
{
|
||||
}
|
||||
|
||||
// Following function (write) deliberately copies uninitialised data
|
||||
// when used with zmq_msg. Initialising the VSM body for
|
||||
// non-VSM messages won't be good for performance.
|
||||
|
||||
#ifdef ZMQ_HAVE_OPENVMS
|
||||
#pragma message save
|
||||
#pragma message disable(UNINIT)
|
||||
#endif
|
||||
inline void write (const T &value_, bool incomplete_)
|
||||
{
|
||||
(void)incomplete_;
|
||||
|
||||
dbuffer.write (value_);
|
||||
}
|
||||
|
||||
#ifdef ZMQ_HAVE_OPENVMS
|
||||
#pragma message restore
|
||||
#endif
|
||||
|
||||
// There are no incomplete items for conflate ypipe
|
||||
inline bool unwrite (T *value_)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
// Flush is no-op for conflate ypipe. Reader asleep behaviour
|
||||
// is as of the usual ypipe.
|
||||
// Returns false if the reader thread is sleeping. In that case,
|
||||
// caller is obliged to wake the reader up before using the pipe again.
|
||||
inline bool flush ()
|
||||
{
|
||||
return reader_awake;
|
||||
}
|
||||
|
||||
// Check whether item is available for reading.
|
||||
inline bool check_read ()
|
||||
{
|
||||
bool res = dbuffer.check_read ();
|
||||
if (!res)
|
||||
reader_awake = false;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
// Reads an item from the pipe. Returns false if there is no value.
|
||||
// available.
|
||||
inline bool read (T *value_)
|
||||
{
|
||||
if (!check_read ())
|
||||
return false;
|
||||
|
||||
return dbuffer.read (value_);
|
||||
}
|
||||
|
||||
// Applies the function fn to the first elemenent in the pipe
|
||||
// and returns the value returned by the fn.
|
||||
// The pipe mustn't be empty or the function crashes.
|
||||
inline bool probe (bool (*fn)(T &))
|
||||
{
|
||||
return dbuffer.probe (fn);
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
dbuffer_t <T> dbuffer;
|
||||
bool reader_awake;
|
||||
|
||||
// Disable copying of ypipe object.
|
||||
ypipe_conflate_t (const ypipe_conflate_t&);
|
||||
const ypipe_conflate_t &operator = (const ypipe_conflate_t&);
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -31,7 +31,8 @@ noinst_PROGRAMS = test_pair_inproc \
|
||||
test_spec_router \
|
||||
test_spec_pushpull \
|
||||
test_req_request_ids \
|
||||
test_req_strict
|
||||
test_req_strict \
|
||||
test_conflate
|
||||
|
||||
if !ON_MINGW
|
||||
noinst_PROGRAMS += test_shutdown_stress \
|
||||
@ -69,6 +70,7 @@ test_spec_router_SOURCES = test_spec_router.cpp
|
||||
test_spec_pushpull_SOURCES = test_spec_pushpull.cpp
|
||||
test_req_request_ids_SOURCES = test_req_request_ids.cpp
|
||||
test_req_strict_SOURCES = test_req_strict.cpp
|
||||
test_conflate_SOURCES = test_conflate.cpp
|
||||
if !ON_MINGW
|
||||
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
|
||||
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
|
||||
|
77
tests/test_conflate.cpp
Normal file
77
tests/test_conflate.cpp
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of 0MQ.
|
||||
|
||||
0MQ is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License as published by
|
||||
the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
0MQ is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include "../include/zmq_utils.h"
|
||||
#include "testutil.hpp"
|
||||
|
||||
int main (int argc, char *argv [])
|
||||
{
|
||||
const char *bind_to = "tcp://127.0.0.1:77008";
|
||||
|
||||
int rc;
|
||||
|
||||
void* ctx = zmq_init (1);
|
||||
assert (ctx);
|
||||
|
||||
void* s_in = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (s_in);
|
||||
|
||||
int conflate = 1;
|
||||
rc = zmq_setsockopt (s_in, ZMQ_CONFLATE, &conflate, sizeof(conflate));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_bind (s_in, bind_to);
|
||||
assert (rc == 0);
|
||||
|
||||
void* s_out = zmq_socket (ctx, ZMQ_PUSH);
|
||||
assert (s_out);
|
||||
|
||||
rc = zmq_connect (s_out, bind_to);
|
||||
assert (rc == 0);
|
||||
|
||||
int message_count = 20;
|
||||
|
||||
for (int j = 0; j < message_count; ++j) {
|
||||
rc = zmq_send(s_out, (void*)&j, sizeof(int), 0);
|
||||
if (rc < 0) {
|
||||
printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
zmq_sleep (1);
|
||||
|
||||
int payload_recved = 0;
|
||||
rc = zmq_recv(s_in, (void*)&payload_recved, sizeof(int), 0);
|
||||
assert (rc > 0);
|
||||
assert (payload_recved == message_count - 1);
|
||||
|
||||
|
||||
rc = zmq_close (s_in);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (s_out);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_term (ctx);
|
||||
assert (rc == 0);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user