From 503da83fceac8c2d6c37ff6fbfac0c4c31e59a91 Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 22:53:02 +0300 Subject: [PATCH 1/9] Add #include to string.h in blob.hpp Required for memmove(), memcpy() --- src/blob.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/blob.hpp b/src/blob.hpp index 8f4d2d82..65d305b3 100644 --- a/src/blob.hpp +++ b/src/blob.hpp @@ -21,6 +21,7 @@ #define __ZMQ_BLOB_HPP_INCLUDED__ #include +#include // Borrowed from id3lib_strings.h: // They seem to be doing something for MSC, but since I only have gcc, I'll just do that From 87c84a252abc59bf34895a42f799278f2f9e478f Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 22:54:29 +0300 Subject: [PATCH 2/9] Add try_lock() to mutex_t --- src/mutex.hpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/mutex.hpp b/src/mutex.hpp index 354df78b..6533450a 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -50,6 +50,11 @@ namespace zmq EnterCriticalSection (&cs); } + inline bool try_lock () + { + return (bool) TryEnterCriticalSection (&cs); + } + inline void unlock () { LeaveCriticalSection (&cs); @@ -94,6 +99,16 @@ namespace zmq posix_assert (rc); } + inline bool try_lock () + { + int rc = pthread_mutex_trylock (&mutex); + if (rc == EBUSY) + return false; + + posix_assert (rc); + return true; + } + inline void unlock () { int rc = pthread_mutex_unlock (&mutex); From 4eac7e3e4fc443d3ad058d245f88f52ae7a1a817 Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 22:55:04 +0300 Subject: [PATCH 3/9] Add scoped_lock_t syntactic sugar --- src/mutex.hpp | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/mutex.hpp b/src/mutex.hpp index 6533450a..308f5eff 100644 --- a/src/mutex.hpp +++ b/src/mutex.hpp @@ -128,4 +128,30 @@ namespace zmq #endif + +namespace zmq +{ + struct scoped_lock_t + { + scoped_lock_t (mutex_t& mutex_) + : mutex (mutex_) + { + mutex.lock (); + } + + ~scoped_lock_t () + { + mutex.unlock (); + } + + private: + + mutex_t& mutex; + + // Disable copy construction and assignment. + scoped_lock_t (const scoped_lock_t&); + const scoped_lock_t &operator = (const scoped_lock_t&); + }; +} + #endif From d020dd677f9c8c4f89f244b19f1b809fb9b3a635 Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 22:59:07 +0300 Subject: [PATCH 4/9] Declare ZMQ_CONFLATE option --- include/zmq.h | 1 + src/options.cpp | 21 ++++++++++++++++++++- src/options.hpp | 6 ++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/include/zmq.h b/include/zmq.h index d1b13709..3ecef1d2 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -278,6 +278,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_PROBE_ROUTER 51 #define ZMQ_REQ_REQUEST_IDS 52 #define ZMQ_REQ_STRICT 53 +#define ZMQ_CONFLATE 54 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index f1767437..010222c8 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -54,7 +54,8 @@ zmq::options_t::options_t () : tcp_keepalive_intvl (-1), mechanism (ZMQ_NULL), as_server (0), - socket_id (0) + socket_id (0), + conflate (false) { } @@ -340,6 +341,16 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; # endif + + case ZMQ_CONFLATE: + if (is_int && (value == 0 || value == 1)) { + conflate = (value != 0); + return 0; + } + break; + + default: + break; } errno = EINVAL; return -1; @@ -596,6 +607,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; # endif + + case ZMQ_CONFLATE: + if (is_int) { + *value = conflate; + return 0; + } + break; + } errno = EINVAL; return -1; diff --git a/src/options.hpp b/src/options.hpp index cf5fbbd7..358b2858 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -142,6 +142,12 @@ namespace zmq // ID of the socket. int socket_id; + + // If true, socket conflates outgoing/incoming messages. + // Applicable to dealer, push/pull, pub/sub socket types. + // Cannot receive multi-part messages. + // Ignores hwm + bool conflate; }; } From 4c35b88acbc7d1237e86833253737632cfd6a216 Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 23:00:46 +0300 Subject: [PATCH 5/9] Implement double buffer for conflate option Add simple double buffer implementation tailored to handle msg_t, i.e invoke msg_t::close instead of destructor and so on. Seems to me mutex is good enough at this point. --- src/dbuffer.hpp | 134 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 src/dbuffer.hpp diff --git a/src/dbuffer.hpp b/src/dbuffer.hpp new file mode 100644 index 00000000..7929304d --- /dev/null +++ b/src/dbuffer.hpp @@ -0,0 +1,134 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DBUFFER_HPP_INCLUDED__ +#define __ZMQ_DBUFFER_HPP_INCLUDED__ + +#include +#include +#include + +#include "mutex.hpp" +#include "msg.hpp" + +namespace zmq +{ + + // dbuffer is a single-producer single-consumer double-buffer + // implementation. + // + // The producer writes to a back buffer and then tries to swap + // pointers between the back and front buffers. If it fails, + // due to the consumer reading from the front buffer, it just + // gives up, which is ok since writes are many and redundant. + // + // The reader simply reads from the front buffer. + // + // has_msg keeps track of whether there has been a not yet read + // value written, it is used by ypipe_conflate to mimic ypipe + // functionality regarding a reader being asleep + + template class dbuffer_t; + + template <> class dbuffer_t + { + public: + + inline dbuffer_t () + : back (&storage[0]) + , front (&storage[1]) + , has_msg (false) + { + back->init (); + front->init (); + } + + inline ~dbuffer_t() + { + back->close (); + front->close (); + } + + inline void write (const msg_t &value_) + { + msg_t& xvalue = const_cast(value_); + + zmq_assert (xvalue.check ()); + back->move (xvalue); // cannot just overwrite, might leak + + zmq_assert (back->check ()); + + if (sync.try_lock ()) + { + std::swap (back, front); + has_msg = true; + + sync.unlock (); + } + } + + inline bool read (msg_t *value_) + { + if (!value_) + return false; + + { + scoped_lock_t lock (sync); + if (!has_msg) + return false; + + zmq_assert (front->check ()); + + *value_ = *front; + front->init (); // avoid double free + + has_msg = false; + return true; + } + } + + + inline bool check_read () + { + scoped_lock_t lock (sync); + + return has_msg; + } + + inline bool probe (bool (*fn)(msg_t &)) + { + scoped_lock_t lock (sync); + return (*fn) (*front); + } + + + private: + msg_t storage[2]; + msg_t *back, *front; + + mutex_t sync; + bool has_msg; + + // Disable copying of dbuffer. + dbuffer_t (const dbuffer_t&); + const dbuffer_t &operator = (const dbuffer_t&); + }; +} + +#endif From daa7a8021f6802d12d8fcfd771dda3101d192f15 Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 23:08:07 +0300 Subject: [PATCH 6/9] Plug in dbuffer to serve the ZMQ_CONFLATE option ZMQ_CONFLATE option is passed to pipepair() which creates a usual ypipe_t or ypipe_conflate_t and plugs it into pipe_t under a common abstract base. --- src/Makefile.am | 5 +- src/pipe.cpp | 51 +++++++++++++---- src/pipe.hpp | 15 +++-- src/session_base.cpp | 17 +++++- src/socket_base.cpp | 27 +++++++-- src/ypipe.hpp | 3 +- src/ypipe_base.hpp | 44 ++++++++++++++ src/ypipe_conflate.hpp | 127 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 263 insertions(+), 26 deletions(-) create mode 100644 src/ypipe_base.hpp create mode 100644 src/ypipe_conflate.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 62a8ba08..57bd08a4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -88,6 +88,7 @@ libzmq_la_SOURCES = \ dealer.hpp \ xsub.hpp \ ypipe.hpp \ + ypipe_flat.hpp \ yqueue.hpp \ z85_codec.hpp \ address.cpp \ @@ -163,7 +164,9 @@ libzmq_la_SOURCES = \ raw_decoder.hpp \ raw_decoder.cpp \ raw_encoder.hpp \ - raw_encoder.cpp + raw_encoder.cpp \ + ypipe_conflate.hpp \ + dbuffer.hpp if ON_MINGW libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ diff --git a/src/pipe.cpp b/src/pipe.cpp index e4b841ad..6bc7addd 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -23,22 +23,37 @@ #include "pipe.hpp" #include "err.hpp" +#include "ypipe.hpp" +#include "ypipe_conflate.hpp" + int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]) + int hwms_ [2], bool delays_ [2], bool conflate_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. - pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); + typedef ypipe_t upipe_normal_t; + typedef ypipe_conflate_t upipe_conflate_t; + + pipe_t::upipe_t *upipe1; + if(conflate_ [0]) + upipe1 = new (std::nothrow) upipe_conflate_t (); + else + upipe1 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe1); - pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); + + pipe_t::upipe_t *upipe2; + if(conflate_ [1]) + upipe2 = new (std::nothrow) upipe_conflate_t (); + else + upipe2 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0]); + hwms_ [1], hwms_ [0], delays_ [0], conflate_ [0]); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1]); + hwms_ [0], hwms_ [1], delays_ [1], conflate_ [1]); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_) : + int inhwm_, int outhwm_, bool delay_, bool conflate_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (delay_), + conflate (conflate_) { } @@ -303,11 +319,16 @@ void zmq::pipe_t::process_pipe_term_ack () // First, delete all the unread messages in the pipe. We have to do it by // hand because msg_t doesn't have automatic destructor. Then deallocate // the ypipe itself. - msg_t msg; - while (inpipe->read (&msg)) { - int rc = msg.close (); - errno_assert (rc == 0); + + if (!conflate) + { + msg_t msg; + while (inpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } } + delete inpipe; // Deallocate the pipe object @@ -439,7 +460,13 @@ void zmq::pipe_t::hiccup () inpipe = NULL; // Create new inpipe. - inpipe = new (std::nothrow) pipe_t::upipe_t (); + if (conflate) + inpipe = new (std::nothrow) + ypipe_t (); + else + inpipe = new (std::nothrow) + ypipe_conflate_t (); + alloc_assert (inpipe); in_active = true; diff --git a/src/pipe.hpp b/src/pipe.hpp index d329b582..5405110c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -21,7 +21,7 @@ #define __ZMQ_PIPE_HPP_INCLUDED__ #include "msg.hpp" -#include "ypipe.hpp" +#include "ypipe_base.hpp" #include "config.hpp" #include "object.hpp" #include "stdint.hpp" @@ -40,8 +40,10 @@ namespace zmq // Delay specifies how the pipe behaves when the peer terminates. If true // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. + // If conflate is true, only the most recently arrived message could be + // read (older messages are discarded) int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]); + int hwms_ [2], bool delays_ [2], bool conflate_ [2]); struct i_pipe_events { @@ -65,7 +67,8 @@ namespace zmq { // This allows pipepair to create pipe objects. friend int pipepair (zmq::object_t *parents_ [2], - zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); + zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], + bool conflate_ [2]); public: @@ -110,7 +113,7 @@ namespace zmq private: // Type of the underlying lock-free pipe. - typedef ypipe_t upipe_t; + typedef ypipe_base_t upipe_t; // Command handlers. void process_activate_read (); @@ -125,7 +128,7 @@ namespace zmq // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_); + int inhwm_, int outhwm_, bool delay_, bool conflate_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -196,6 +199,8 @@ namespace zmq // Computes appropriate low watermark from the given high watermark. static int compute_lwm (int hwm_); + bool conflate; + // Disable copying. pipe_t (const pipe_t&); const pipe_t &operator = (const pipe_t&); diff --git a/src/session_base.cpp b/src/session_base.cpp index 302fec8a..025bafd1 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -300,7 +300,8 @@ int zmq::session_base_t::zap_connect () pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {0, 0}; bool delays [2] = {false, false}; - int rc = pipepair (parents, new_pipes, hwms, delays); + bool conflates [2] = {false, false}; + int rc = pipepair (parents, new_pipes, hwms, delays, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -331,9 +332,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_) if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.rcvhwm, options.sndhwm}; + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : options.rcvhwm, + conflate? -1 : options.sndhwm}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); + bool conflates [2] = {conflate, conflate}; + int rc = pipepair (parents, pipes, hwms, delays, conflates); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8034d143..fea6c85a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -450,9 +450,18 @@ int zmq::socket_base_t::connect (const char *addr_) // Create a bi-directional pipe to connect the peers. object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; - int hwms [2] = {sndhwm, rcvhwm}; + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - int rc = pipepair (parents, new_pipes, hwms, delays); + bool conflates [2] = {conflate, conflate}; + int rc = pipepair (parents, new_pipes, hwms, delays, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -553,9 +562,19 @@ int zmq::socket_base_t::connect (const char *addr_) // Create a bi-directional pipe. object_t *parents [2] = {this, session}; pipe_t *new_pipes [2] = {NULL, NULL}; - int hwms [2] = {options.sndhwm, options.rcvhwm}; + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : options.sndhwm, + conflate? -1 : options.rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, new_pipes, hwms, delays); + bool conflates [2] = {conflate, conflate}; + rc = pipepair (parents, new_pipes, hwms, delays, conflates); errno_assert (rc == 0); // Attach local end of the pipe to the socket object. diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 182df4e7..86e5d01c 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -23,6 +23,7 @@ #include "atomic_ptr.hpp" #include "yqueue.hpp" #include "platform.hpp" +#include "ypipe_base.hpp" namespace zmq { @@ -34,7 +35,7 @@ namespace zmq // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation. - template class ypipe_t + template class ypipe_t : public ypipe_base_t { public: diff --git a/src/ypipe_base.hpp b/src/ypipe_base.hpp new file mode 100644 index 00000000..b7e7081b --- /dev/null +++ b/src/ypipe_base.hpp @@ -0,0 +1,44 @@ + +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_YPIPE_BASE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_BASE_HPP_INCLUDED__ + + +namespace zmq +{ + // ypipe_base abstracts ypipe and ypipe_conflate specific + // classes, one is selected according to a the conflate + // socket option + + template class ypipe_base_t + { + public: + virtual ~ypipe_base_t () {} + virtual void write (const T &value_, bool incomplete_) = 0; + virtual bool unwrite (T *value_) = 0; + virtual bool flush () = 0; + virtual bool check_read () = 0; + virtual bool read (T *value_) = 0; + virtual bool probe (bool (*fn)(T &)) = 0; + }; +} + +#endif diff --git a/src/ypipe_conflate.hpp b/src/ypipe_conflate.hpp new file mode 100644 index 00000000..6dc20ef9 --- /dev/null +++ b/src/ypipe_conflate.hpp @@ -0,0 +1,127 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__ + +#include "platform.hpp" +#include "dbuffer.hpp" +#include "ypipe_base.hpp" + +namespace zmq +{ + + // Adapter for dbuffer, to plug it in instead of a queue for the sake + // of implementing the conflate socket option, which, if set, makes + // the receiving side to discard all incoming messages but the last one. + // + // reader_awake flag is needed here to mimic ypipe delicate behaviour + // around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp) + + template class ypipe_conflate_t : public ypipe_base_t + { + public: + + // Initialises the pipe. + inline ypipe_conflate_t () + : reader_awake(false) + { + } + + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~ypipe_conflate_t () + { + } + + // Following function (write) deliberately copies uninitialised data + // when used with zmq_msg. Initialising the VSM body for + // non-VSM messages won't be good for performance. + +#ifdef ZMQ_HAVE_OPENVMS +#pragma message save +#pragma message disable(UNINIT) +#endif + inline void write (const T &value_, bool incomplete_) + { + (void)incomplete_; + + dbuffer.write (value_); + } + +#ifdef ZMQ_HAVE_OPENVMS +#pragma message restore +#endif + + // There are no incomplete items for conflate ypipe + inline bool unwrite (T *value_) + { + return false; + } + + // Flush is no-op for conflate ypipe. Reader asleep behaviour + // is as of the usual ypipe. + // Returns false if the reader thread is sleeping. In that case, + // caller is obliged to wake the reader up before using the pipe again. + inline bool flush () + { + return reader_awake; + } + + // Check whether item is available for reading. + inline bool check_read () + { + bool res = dbuffer.check_read (); + if (!res) + reader_awake = false; + + return res; + } + + // Reads an item from the pipe. Returns false if there is no value. + // available. + inline bool read (T *value_) + { + if (!check_read ()) + return false; + + return dbuffer.read (value_); + } + + // Applies the function fn to the first elemenent in the pipe + // and returns the value returned by the fn. + // The pipe mustn't be empty or the function crashes. + inline bool probe (bool (*fn)(T &)) + { + return dbuffer.probe (fn); + } + + protected: + + dbuffer_t dbuffer; + bool reader_awake; + + // Disable copying of ypipe object. + ypipe_conflate_t (const ypipe_conflate_t&); + const ypipe_conflate_t &operator = (const ypipe_conflate_t&); + }; + +} + +#endif From f59c1a5c5aec15cd6117eee951ad51ac302ca1ad Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 23:55:00 +0300 Subject: [PATCH 7/9] Update doc for ZMQ_CONFLATE socket option --- doc/zmq_setsockopt.txt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index c2d77808..b5fdc723 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -682,6 +682,22 @@ Default value:: NULL Applicable socket types:: all, when using TCP transport +ZMQ_CONFLATE: Keep only last message +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If set, a socket shall keep only one message in its inbound/outbound +queue, this message being the last message received/the last message +to be sent. +Ignores 'ZMQ_RECVHWM' and 'ZMQ_SENDHWM' options. +Does not supports multi-part messages, in particular, only one part of it +is kept in the socket internal queue. +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it From da4a70d59d14f68d5cb463cce01e9a772f6ca5a2 Mon Sep 17 00:00:00 2001 From: danielkr Date: Mon, 19 Aug 2013 08:18:20 +0300 Subject: [PATCH 8/9] Fix indentation --- src/pipe.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pipe.cpp b/src/pipe.cpp index 6bc7addd..5669005a 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -320,8 +320,7 @@ void zmq::pipe_t::process_pipe_term_ack () // hand because msg_t doesn't have automatic destructor. Then deallocate // the ypipe itself. - if (!conflate) - { + if (!conflate) { msg_t msg; while (inpipe->read (&msg)) { int rc = msg.close (); From 3f3777d05bada63414339bedb7aefc56051836c2 Mon Sep 17 00:00:00 2001 From: danielkr Date: Mon, 19 Aug 2013 15:34:11 +0300 Subject: [PATCH 9/9] Add test for ZMQ_CONFLATE option --- tests/Makefile.am | 4 ++- tests/test_conflate.cpp | 77 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 tests/test_conflate.cpp diff --git a/tests/Makefile.am b/tests/Makefile.am index 6464428f..5bc8d12f 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -31,7 +31,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_spec_router \ test_spec_pushpull \ test_req_request_ids \ - test_req_strict + test_req_strict \ + test_conflate if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ @@ -69,6 +70,7 @@ test_spec_router_SOURCES = test_spec_router.cpp test_spec_pushpull_SOURCES = test_spec_pushpull.cpp test_req_request_ids_SOURCES = test_req_request_ids.cpp test_req_strict_SOURCES = test_req_strict.cpp +test_conflate_SOURCES = test_conflate.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_conflate.cpp b/tests/test_conflate.cpp new file mode 100644 index 00000000..bbc7646d --- /dev/null +++ b/tests/test_conflate.cpp @@ -0,0 +1,77 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include "../include/zmq_utils.h" +#include "testutil.hpp" + +int main (int argc, char *argv []) +{ + const char *bind_to = "tcp://127.0.0.1:77008"; + + int rc; + + void* ctx = zmq_init (1); + assert (ctx); + + void* s_in = zmq_socket (ctx, ZMQ_PULL); + assert (s_in); + + int conflate = 1; + rc = zmq_setsockopt (s_in, ZMQ_CONFLATE, &conflate, sizeof(conflate)); + assert (rc == 0); + + rc = zmq_bind (s_in, bind_to); + assert (rc == 0); + + void* s_out = zmq_socket (ctx, ZMQ_PUSH); + assert (s_out); + + rc = zmq_connect (s_out, bind_to); + assert (rc == 0); + + int message_count = 20; + + for (int j = 0; j < message_count; ++j) { + rc = zmq_send(s_out, (void*)&j, sizeof(int), 0); + if (rc < 0) { + printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno)); + return -1; + } + } + + zmq_sleep (1); + + int payload_recved = 0; + rc = zmq_recv(s_in, (void*)&payload_recved, sizeof(int), 0); + assert (rc > 0); + assert (payload_recved == message_count - 1); + + + rc = zmq_close (s_in); + assert (rc == 0); + + rc = zmq_close (s_out); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0; +}