0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 15:41:05 +08:00

send/recv was changed to send/recv/sendmsg/recvmsg

send/recv now complies with POSIX by using raw buffers instead
of message objects and by returning number of bytes sent/recvd
instead of 0/-1.

The return value is changed accordingly for sendmsg and recvmsg.

Note that related man pages will be fixed in a separate patch.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2011-03-24 11:53:55 +01:00
parent d4e83d2601
commit fb27a000d9
8 changed files with 98 additions and 85 deletions

View File

@ -216,8 +216,10 @@ ZMQ_EXPORT int zmq_getsockopt (void *s, int option, void *optval,
size_t *optvallen); size_t *optvallen);
ZMQ_EXPORT int zmq_bind (void *s, const char *addr); ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
ZMQ_EXPORT int zmq_connect (void *s, const char *addr); ZMQ_EXPORT int zmq_connect (void *s, const char *addr);
ZMQ_EXPORT int zmq_send (void *s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_sendmsg (void *s, zmq_msg_t *msg, int flags);
ZMQ_EXPORT int zmq_recvmsg (void *s, zmq_msg_t *msg, int flags);
/******************************************************************************/ /******************************************************************************/
/* I/O multiplexing. */ /* I/O multiplexing. */

View File

@ -68,18 +68,18 @@ int main (int argc, char *argv [])
} }
for (i = 0; i != roundtrip_count; i++) { for (i = 0; i != roundtrip_count; i++) {
rc = zmq_recv (s, &msg, 0); rc = zmq_recvmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
if (zmq_msg_size (&msg) != message_size) { if (zmq_msg_size (&msg) != message_size) {
printf ("message of incorrect size received\n"); printf ("message of incorrect size received\n");
return -1; return -1;
} }
rc = zmq_send (s, &msg, 0); rc = zmq_sendmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno)); printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
} }

View File

@ -79,9 +79,9 @@ int main (int argc, char *argv [])
return -1; return -1;
} }
rc = zmq_recv (s, &msg, 0); rc = zmq_recvmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
if (zmq_msg_size (&msg) != message_size) { if (zmq_msg_size (&msg) != message_size) {
@ -92,9 +92,9 @@ int main (int argc, char *argv [])
watch = zmq_stopwatch_start (); watch = zmq_stopwatch_start ();
for (i = 0; i != message_count - 1; i++) { for (i = 0; i != message_count - 1; i++) {
rc = zmq_recv (s, &msg, 0); rc = zmq_recvmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
if (zmq_msg_size (&msg) != message_size) { if (zmq_msg_size (&msg) != message_size) {

View File

@ -75,14 +75,14 @@ int main (int argc, char *argv [])
watch = zmq_stopwatch_start (); watch = zmq_stopwatch_start ();
for (i = 0; i != roundtrip_count; i++) { for (i = 0; i != roundtrip_count; i++) {
rc = zmq_send (s, &msg, 0); rc = zmq_sendmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno)); printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
rc = zmq_recv (s, &msg, 0); rc = zmq_recvmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); printf ("error in zmq_recvmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
if (zmq_msg_size (&msg) != message_size) { if (zmq_msg_size (&msg) != message_size) {

View File

@ -76,9 +76,9 @@ int main (int argc, char *argv [])
memset (zmq_msg_data (&msg), 0, message_size); memset (zmq_msg_data (&msg), 0, message_size);
#endif #endif
rc = zmq_send (s, &msg, 0); rc = zmq_sendmsg (s, &msg, 0);
if (rc != 0) { if (rc < 0) {
printf ("error in zmq_send: %s\n", zmq_strerror (errno)); printf ("error in zmq_sendmsg: %s\n", zmq_strerror (errno));
return -1; return -1;
} }
rc = zmq_msg_close (&msg); rc = zmq_msg_close (&msg);

View File

@ -343,22 +343,73 @@ int zmq_connect (void *s_, const char *addr_)
return (((zmq::socket_base_t*) s_)->connect (addr_)); return (((zmq::socket_base_t*) s_)->connect (addr_));
} }
int zmq_send (void *s_, zmq_msg_t *msg_, int flags_) int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
{ {
if (!s_) { zmq_msg_t msg;
errno = EFAULT; int rc = zmq_msg_init_size (&msg, len_);
if (rc != 0)
return -1;
memcpy (zmq_msg_data (&msg), buf_, len_);
rc = zmq_sendmsg (s_, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
return -1; return -1;
} }
return (((zmq::socket_base_t*) s_)->send (msg_, flags_));
// Note the optimisation here. We don't close the msg object as it is
// empty anyway. This may change when implementation of zmq_msg_t changes.
return rc;
} }
int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_) int zmq_recv (void *s_, void *buf_, size_t len_, int flags_)
{
zmq_msg_t msg;
int rc = zmq_msg_init (&msg);
errno_assert (rc == 0);
rc = zmq_recvmsg (s_, &msg, flags_);
if (unlikely (rc < 0)) {
int err = errno;
int rc2 = zmq_msg_close (&msg);
errno_assert (rc2 == 0);
errno = err;
return -1;
}
// At the moment an oversized message is silently truncated.
// TODO: Build in a notification mechanism to report the overflows.
size_t to_copy = size_t (rc) < len_ ? size_t (rc) : len_;
memcpy (buf_, zmq_msg_data (&msg), to_copy);
return (int) to_copy;
}
int zmq_sendmsg (void *s_, zmq_msg_t *msg_, int flags_)
{ {
if (!s_) { if (!s_) {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
return (((zmq::socket_base_t*) s_)->recv (msg_, flags_)); int sz = (int) zmq_msg_size (msg_);
int rc = (((zmq::socket_base_t*) s_)->send (msg_, flags_));
if (unlikely (rc < 0))
return -1;
return sz;
}
int zmq_recvmsg (void *s_, zmq_msg_t *msg_, int flags_)
{
if (!s_) {
errno = EFAULT;
return -1;
}
int rc = (((zmq::socket_base_t*) s_)->recv (msg_, flags_));
if (unlikely (rc < 0))
return -1;
return (int) zmq_msg_size (msg_);
} }
#if defined ZMQ_FORCE_SELECT #if defined ZMQ_FORCE_SELECT

View File

@ -49,49 +49,25 @@ int main (int argc, char *argv [])
// Try to send 10 messages. Only 4 should succeed. // Try to send 10 messages. Only 4 should succeed.
for (int i = 0; i < 10; i++) for (int i = 0; i < 10; i++)
{ {
zmq_msg_t msg; int rc = zmq_send (sc, NULL, 0, ZMQ_NOBLOCK);
rc = zmq_msg_init (&msg);
assert (rc == 0);
int rc = zmq_send (sc, &msg, ZMQ_NOBLOCK);
if (i < 4) if (i < 4)
assert (rc == 0); assert (rc == 0);
else else
assert (rc != 0 && errno == EAGAIN); assert (rc < 0 && errno == EAGAIN);
rc = zmq_msg_close (&msg);
assert (rc == 0);
} }
// There should be now 4 messages pending, consume them. // There should be now 4 messages pending, consume them.
for (int i = 0; i != 4; i++) { for (int i = 0; i != 4; i++) {
rc = zmq_recv (sb, NULL, 0, 0);
zmq_msg_t msg;
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_recv (sb, &msg, 0);
assert (rc == 0);
rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
} }
// Now it should be possible to send one more. // Now it should be possible to send one more.
zmq_msg_t msg; rc = zmq_send (sc, NULL, 0, 0);
rc = zmq_msg_init (&msg);
assert (rc == 0);
rc = zmq_send (sc, &msg, 0);
assert (rc == 0);
rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
// Consume the remaining message. // Consume the remaining message.
rc = zmq_msg_init (&msg); rc = zmq_recv (sb, NULL, 0, 0);
assert (rc == 0);
rc = zmq_recv (sb, &msg, 0);
assert (rc == 0);
rc = zmq_msg_close (&msg);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (sc); rc = zmq_close (sc);

View File

@ -31,39 +31,23 @@ inline void bounce (void *sb, void *sc)
const char *content = "12345678ABCDEFGH12345678abcdefgh"; const char *content = "12345678ABCDEFGH12345678abcdefgh";
// Send the message. // Send the message.
zmq_msg_t msg1; int rc = zmq_send (sc, content, 32, 0);
int rc = zmq_msg_init_size (&msg1, 32); assert (rc == 32);
memcpy (zmq_msg_data (&msg1), content, 32);
rc = zmq_send (sc, &msg1, 0);
assert (rc == 0);
rc = zmq_msg_close (&msg1);
assert (rc == 0);
// Bounce the message back. // Bounce the message back.
zmq_msg_t msg2; char buf1 [32];
rc = zmq_msg_init (&msg2); rc = zmq_recv (sb, buf1, 32, 0);
assert (rc == 0); assert (rc == 32);
rc = zmq_recv (sb, &msg2, 0); rc = zmq_send (sb, buf1, 32, 0);
assert (rc == 0); assert (rc == 32);
rc = zmq_send (sb, &msg2, 0);
assert (rc == 0);
rc = zmq_msg_close (&msg2);
assert (rc == 0);
// Receive the bounced message. // Receive the bounced message.
zmq_msg_t msg3; char buf2 [32];
rc = zmq_msg_init (&msg3); rc = zmq_recv (sc, buf2, 32, 0);
assert (rc == 0); assert (rc == 32);
rc = zmq_recv (sc, &msg3, 0);
assert (rc == 0);
// Check whether the message is still the same. // Check whether the message is still the same.
assert (zmq_msg_size (&msg3) == 32); assert (memcmp (buf2, content, 32) == 0);
assert (memcmp (zmq_msg_data (&msg3), content, 32) == 0);
rc = zmq_msg_close (&msg3);
assert (rc == 0);
} }
#endif #endif