This patch adds a new member of type deque to the
xpub class that contains pointers to metadata_t.
This deque is then used (alongside pending_data
and pending_flags) to preserve metadata when
copying messages.
While sending very large messages (far beyond what fits in a the tcp
buffer, so it takes multiple sendto system calls for it to finish),
zmq_close will close the connection regardless of ZMQ_LINGER.
In case no engine is attached, a pipe->check_read() is needed to look
for the delimiter in the pipe and ultimately trigger the pipe
termination.
However, if there *is* an engine attached, the check_read() looks ahead
and finds the delimiter and terminates the connection even though the
engine might actually still be in the middle of sending a message.
This happens because while the io_thread is still busy sending the data,
the pipe can get terminated and the io thread ends up being terminated.
Despite the old comments, re-initing the msg_t leaks a refcount to
metadata in some situations.
v1_decoder looks like it isn't tested any more, but it seems like a good
idea to fix it because it has the exact same piece of buggy code
v2_decoder does.
Fixes not receiving unsubscription messages in XPUB socket with
ZMQ_XPUB_VERBOSE and using a XSUB-XPUB proxy in front.
This adds two modifications:
- It adds a new flag, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, to enable verbose
unsubscription messages, necessary when using a XSUB/XPUB proxy.
- It adds a boolean switch to zmq::mtrie_t::rm () to control if the
callback is invoked every time or only in the last removal. Necessary
when a pipe is terminated and the verbose mode for unsubscriptions is
enabled.
A memcpy is eliminated when receiving data on a ZMQ_STREAM socket. Instead
of receiving into a static buffer and then copying the data into the
buffer malloced in msg_t::init_size, the raw_decoder allocates the memory
for together with the reference-counter and creates a msg_t object
on top of that memory. This saves the memcpy operation.
For small messages, data is still copied and the receive buffer is reused.