diff --git a/src/msg.cpp b/src/msg.cpp index a134a5b0..bed4cce8 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -45,11 +45,30 @@ typedef char zmq_msg_size_check [2 * ((sizeof (zmq::msg_t) == sizeof (zmq_msg_t)) != 0) - 1]; +// check whether the size of atomic_counter_t matches the size of the wrapped integer +// to ensure that the lsmg union is correctly aligned +typedef char zmq_msg_size_check + [2 * ((sizeof (zmq::atomic_counter_t) == sizeof (zmq::atomic_counter_t::integer_t)) != 0) - 1]; + bool zmq::msg_t::check () { return u.base.type >= type_min && u.base.type <= type_max; } +int zmq::msg_t::init (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_) +{ + if (size_ <= max_vsm_size) + { + int rc = init_size(size_); + memcpy(data(), data_, size_); + return rc; + } + else + { + return init_data(data_, size_, ffn_, hint_); + } +} + int zmq::msg_t::init () { u.vsm.metadata = NULL; @@ -76,18 +95,16 @@ int zmq::msg_t::init_size (size_t size_) u.lmsg.type = type_lmsg; u.lmsg.flags = 0; u.lmsg.routing_id = 0; - u.lmsg.content = - (content_t*) malloc (sizeof (content_t) + size_); - if (unlikely (!u.lmsg.content)) { + u.lmsg.data = malloc(size_); + if (unlikely (!u.lmsg.data)) { errno = ENOMEM; return -1; } - u.lmsg.content->data = u.lmsg.content + 1; - u.lmsg.content->size = size_; - u.lmsg.content->ffn = NULL; - u.lmsg.content->hint = NULL; - new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); + u.lmsg.size = size_; + u.lmsg.ffn = NULL; + u.lmsg.hint = NULL; + new (&u.lmsg.refcnt.counter) zmq::atomic_counter_t (); } return 0; } @@ -115,17 +132,12 @@ int zmq::msg_t::init_data (void *data_, size_t size_, msg_free_fn *ffn_, u.lmsg.type = type_lmsg; u.lmsg.flags = 0; u.lmsg.routing_id = 0; - u.lmsg.content = (content_t*) malloc (sizeof (content_t)); - if (!u.lmsg.content) { - errno = ENOMEM; - return -1; - } - u.lmsg.content->data = data_; - u.lmsg.content->size = size_; - u.lmsg.content->ffn = ffn_; - u.lmsg.content->hint = hint_; - new (&u.lmsg.content->refcnt) zmq::atomic_counter_t (); + u.lmsg.data = data_; + u.lmsg.size = size_; + u.lmsg.ffn = ffn_; + u.lmsg.hint = hint_; + new (&u.lmsg.refcnt.counter) zmq::atomic_counter_t (); } return 0; @@ -140,6 +152,13 @@ int zmq::msg_t::init_delimiter () return 0; } +zmq::atomic_counter_t& zmq::msg_t::msg_counter() +{ + zmq_assert( is_lmsg() ); + void* ptr = static_cast( &u.lmsg.refcnt.counter ); + return *static_cast( ptr ); +} + int zmq::msg_t::close () { // Check the validity of the message. @@ -153,16 +172,14 @@ int zmq::msg_t::close () // If the content is not shared, or if it is shared and the reference // count has dropped to zero, deallocate it. if (!(u.lmsg.flags & msg_t::shared) || - !u.lmsg.content->refcnt.sub (1)) { + !msg_counter().sub (1)) { - // We used "placement new" operator to initialize the reference - // counter so we call the destructor explicitly now. - u.lmsg.content->refcnt.~atomic_counter_t (); - - if (u.lmsg.content->ffn) - u.lmsg.content->ffn (u.lmsg.content->data, - u.lmsg.content->hint); - free (u.lmsg.content); + if (u.lmsg.ffn) { + u.lmsg.ffn(u.lmsg.data, u.lmsg.hint); + } + else { + free (u.lmsg.data); + } } } @@ -214,10 +231,10 @@ int zmq::msg_t::copy (msg_t &src_) // One reference is added to shared messages. Non-shared messages // are turned into shared messages and reference count is set to 2. if (src_.u.lmsg.flags & msg_t::shared) - src_.u.lmsg.content->refcnt.add (1); + src_.msg_counter().add (1); else { src_.u.lmsg.flags |= msg_t::shared; - src_.u.lmsg.content->refcnt.set (2); + src_.msg_counter().set (2); } } @@ -239,7 +256,7 @@ void *zmq::msg_t::data () case type_vsm: return u.vsm.data; case type_lmsg: - return u.lmsg.content->data; + return u.lmsg.data; case type_cmsg: return u.cmsg.data; default: @@ -257,7 +274,7 @@ size_t zmq::msg_t::size () case type_vsm: return u.vsm.size; case type_lmsg: - return u.lmsg.content->size; + return u.lmsg.size; case type_cmsg: return u.cmsg.size; default: @@ -333,6 +350,11 @@ bool zmq::msg_t::is_vsm () return u.base.type == type_vsm; } +bool zmq::msg_t::is_lmsg () const +{ + return u.base.type == type_lmsg; +} + bool zmq::msg_t::is_cmsg () { return u.base.type == type_cmsg; @@ -353,9 +375,9 @@ void zmq::msg_t::add_refs (int refs_) // message type that needs special care are long messages. if (u.base.type == type_lmsg) { if (u.lmsg.flags & msg_t::shared) - u.lmsg.content->refcnt.add (refs_); + msg_counter().add (refs_); else { - u.lmsg.content->refcnt.set (refs_ + 1); + msg_counter().set (refs_ + 1); u.lmsg.flags |= msg_t::shared; } } @@ -379,14 +401,14 @@ bool zmq::msg_t::rm_refs (int refs_) } // The only message type that needs special care are long messages. - if (!u.lmsg.content->refcnt.sub (refs_)) { + if (!msg_counter().sub (refs_)) { // We used "placement new" operator to initialize the reference // counter so we call the destructor explicitly now. - u.lmsg.content->refcnt.~atomic_counter_t (); + msg_counter().~atomic_counter_t (); - if (u.lmsg.content->ffn) - u.lmsg.content->ffn (u.lmsg.content->data, u.lmsg.content->hint); - free (u.lmsg.content); + if (u.lmsg.ffn) + u.lmsg.ffn (u.lmsg.data, u.lmsg.hint); + free (u.lmsg.data); return false; } @@ -404,3 +426,4 @@ int zmq::msg_t::set_routing_id(uint32_t routing_id_) u.base.routing_id = routing_id_; return 0; } + diff --git a/src/msg.hpp b/src/msg.hpp index 647c1a20..6170a23a 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -54,7 +54,6 @@ namespace zmq class msg_t { public: - // Message flags. enum { @@ -66,6 +65,8 @@ namespace zmq }; bool check (); + int init (void *data_, size_t size_, msg_free_fn *ffn_, + void *hint_); int init (); int init_size (size_t size_); int init_data (void *data_, size_t size_, msg_free_fn *ffn_, @@ -88,6 +89,7 @@ namespace zmq bool is_credential () const; bool is_delimiter () const; bool is_vsm (); + bool is_lmsg () const; bool is_cmsg (); uint32_t get_routing_id(); int set_routing_id(uint32_t routing_id_); @@ -107,22 +109,6 @@ namespace zmq enum { msg_t_size = 64 }; enum { max_vsm_size = msg_t_size - (8 + sizeof (metadata_t *) + 3 + sizeof(uint32_t)) }; - // Shared message buffer. Message data are either allocated in one - // continuous block along with this structure - thus avoiding one - // malloc/free pair or they are stored in used-supplied memory. - // In the latter case, ffn member stores pointer to the function to be - // used to deallocate the data. If the buffer is actually shared (there - // are at least 2 references to it) refcount member contains number of - // references. - struct content_t - { - void *data; - size_t size; - msg_free_fn *ffn; - void *hint; - zmq::atomic_counter_t refcnt; - }; - // Different message types. enum type_t { @@ -138,6 +124,8 @@ namespace zmq type_max = 104 }; + atomic_counter_t& msg_counter(); + // the file descriptor where this message originated, needs to be 64bit due to alignment int64_t file_desc; @@ -161,10 +149,32 @@ namespace zmq unsigned char flags; uint32_t routing_id; } vsm; - struct { + struct lmsg_t { metadata_t *metadata; - content_t *content; - unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + sizeof (content_t*) + 2 + sizeof(uint32_t))]; + // Shared message buffer. Message data are either allocated in one + // continuous block along with this structure - thus avoiding one + // malloc/free pair or they are stored in used-supplied memory. + // In the latter case, ffn member stores pointer to the function to be + // used to deallocate the data. If the buffer is actually shared (there + // are at least 2 references to it) refcount member contains number of + // references. + void *data; + size_t size; + msg_free_fn *ffn; + void *hint; + // create an aligned block for an atomic_counter_t object + union aligned_atomic_counter_storage { + zmq::atomic_counter_t::integer_t maxAlign; + unsigned char counter[ sizeof(zmq::atomic_counter_t) ]; + } refcnt; + unsigned char unused [msg_t_size - (8 + sizeof (metadata_t *) + + sizeof(void*) + + sizeof(size_t) + + sizeof(msg_free_fn*) + + sizeof(void*) + + sizeof(aligned_atomic_counter_storage) + + 2 + + sizeof(uint32_t))]; unsigned char type; unsigned char flags; uint32_t routing_id;