diff --git a/src/decoder.hpp b/src/decoder.hpp index 4a0c0ed6..3860af2d 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -143,8 +143,9 @@ namespace zmq } } - inline bool message_ready_size (size_t msg_sz){ - zmq_assert(false); + inline bool message_ready_size (size_t msg_sz) + { + zmq_assert (false); return false; } diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index 549fc133..ab4b96fc 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -41,7 +41,7 @@ namespace zmq virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; virtual bool stalled () const = 0; - + virtual bool message_ready_size (size_t msg_sz) = 0; }; diff --git a/src/options.hpp b/src/options.hpp index 403d59fc..2c29e442 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -114,7 +114,7 @@ namespace zmq // If true, the identity message is forwarded to the socket. bool recv_identity; - + // if true, router socket accepts non-zmq tcp connections bool raw_sock; diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index f7e9925b..d35fd222 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -93,7 +93,7 @@ bool zmq::raw_decoder_t::raw_message_ready () // raw_message_ready should never get called in state machine w/o // message_ready_size from stream_engine. next_step (in_progress.data (), 1, - &raw_decoder_t::raw_message_ready); + &raw_decoder_t::raw_message_ready); return true; } diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp index 05bacb1f..0dfbb732 100644 --- a/src/raw_decoder.hpp +++ b/src/raw_decoder.hpp @@ -38,7 +38,7 @@ namespace zmq { public: - raw_decoder_t (size_t bufsize_, + raw_decoder_t (size_t bufsize_, int64_t maxmsgsize_, i_msg_sink *msg_sink_); virtual ~raw_decoder_t (); diff --git a/src/raw_encoder.cpp b/src/raw_encoder.cpp index eac12934..7c26b3ee 100644 --- a/src/raw_encoder.cpp +++ b/src/raw_encoder.cpp @@ -58,7 +58,6 @@ bool zmq::raw_encoder_t::raw_message_size_ready () bool zmq::raw_encoder_t::raw_message_ready () { - // Destroy content of the old message. int rc = in_progress.close (); errno_assert (rc == 0); diff --git a/src/raw_encoder.hpp b/src/raw_encoder.hpp index 748a68fd..dd111863 100644 --- a/src/raw_encoder.hpp +++ b/src/raw_encoder.hpp @@ -40,7 +40,6 @@ namespace zmq { - // Encoder for 0MQ framing protocol. Converts messages into data batches. class raw_encoder_t : public encoder_base_t diff --git a/src/router.cpp b/src/router.cpp index 67de97bd..fd824062 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -78,8 +78,8 @@ void zmq::router_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) int zmq::router_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - if (option_ != ZMQ_ROUTER_MANDATORY && - option_ != ZMQ_ROUTER_RAW_SOCK) { + if (option_ != ZMQ_ROUTER_MANDATORY + && option_ != ZMQ_ROUTER_RAW_SOCK) { errno = EINVAL; return -1; } @@ -87,16 +87,15 @@ int zmq::router_t::xsetsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } - if(option_ == ZMQ_ROUTER_RAW_SOCK){ - raw_sock = *static_cast (optval_); - if(raw_sock){ + if (option_ == ZMQ_ROUTER_RAW_SOCK) { + raw_sock = *static_cast (optval_); + if (raw_sock) { options.recv_identity = false; options.raw_sock = true; } - - }else{ - mandatory = *static_cast (optval_); } + else + mandatory = *static_cast (optval_); return 0; } @@ -170,8 +169,8 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) it->second.active = false; current_out = NULL; } - } - else + } + else if (mandatory) { more_out = false; errno = EHOSTUNREACH; @@ -186,9 +185,9 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) return 0; } - // ignore the MORE flag for raw-sock or assert? - if(options.raw_sock) - msg_->reset_flags(msg_t::more); + // Ignore the MORE flag for raw-sock or assert? + if (options.raw_sock) + msg_->reset_flags (msg_t::more); // Check whether this is the last part of the message. more_out = msg_->flags () & msg_t::more ? true : false; @@ -199,13 +198,13 @@ int zmq::router_t::xsend (msg_t *msg_, int flags_) // Close the remote connection if user has asked to do so // by sending zero length message. // Pending messages in the pipe will be dropped (on receiving term- ack) - if (raw_sock && msg_->size() == 0){ - current_out->terminate(false); + if (raw_sock && msg_->size() == 0) { + current_out->terminate (false); int rc = msg_->close (); errno_assert (rc == 0); current_out = NULL; return 0; - } + } bool ok = current_out->write (msg_); if (unlikely (!ok)) @@ -349,12 +348,13 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) blob_t identity; bool ok; - if(options.raw_sock){ // always assign identity for raw-socket + if (options.raw_sock) { // Always assign identity for raw-socket unsigned char buf [5]; buf [0] = 0; put_uint32 (buf + 1, next_peer_id++); identity = blob_t (buf, sizeof buf); - }else{ + } + else { msg.init (); ok = pipe_->read (&msg); if (!ok) diff --git a/src/session_base.cpp b/src/session_base.cpp index 97fcc6d3..a3c0fb73 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -120,10 +120,10 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, identity_received (false), addr (addr_) { - // identities are not exchanged for raw sockets - if(options.raw_sock){ - identity_sent = (true); - identity_received = (true); + // Identities are not exchanged for raw sockets + if (options.raw_sock) { + identity_sent = true; + identity_received = true; } } @@ -250,12 +250,12 @@ void zmq::session_base_t::terminated (pipe_t *pipe_) // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); - if (!is_terminating() && options.raw_sock){ - if(engine){ + if (!is_terminating () && options.raw_sock) { + if (engine) { engine->terminate (); engine = NULL; } - terminate(); + terminate (); } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index df3f9aae..94f8cd32 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -135,7 +135,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, io_object_t::plug (io_thread_); handle = add_fd (s); - if(options.raw_sock){ + if (options.raw_sock) { // no handshaking for raw sock, instantiate raw encoder and decoders encoder = new (std::nothrow) raw_encoder_t (out_batch_size, session); alloc_assert (encoder); @@ -146,7 +146,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, // disable handshaking for raw socket handshaking = false; - }else{ + } + else { // Send the 'length' and 'flags' fields of the identity message. // The 'length' field is encoded in the long format. outpos = greeting_output_buffer; @@ -215,13 +216,13 @@ void zmq::stream_engine_t::in_event () } } - if(options.raw_sock){ - if(insize == 0 || !decoder->message_ready_size(insize)){ - processed = 0; - }else{ + if (options.raw_sock) { + if (insize == 0 || !decoder->message_ready_size (insize)) + processed = 0; + else processed = decoder->process_buffer (inpos, insize); - } - }else{ + } + else { // Push the data to the decoder. processed = decoder->process_buffer (inpos, insize); } diff --git a/tests/test_raw_sock.cpp b/tests/test_raw_sock.cpp index 99a03e4a..86bc4ec0 100644 --- a/tests/test_raw_sock.cpp +++ b/tests/test_raw_sock.cpp @@ -31,78 +31,72 @@ #include #include #include -#include +#include //ToDo: Windows? const char *test_str = "TEST-STRING"; - -int tcp_client(){ - - int sockfd, portno; +int tcp_client () +{ struct sockaddr_in serv_addr; struct hostent *server; - portno = 5555; + const int portno = 5555; - sockfd = socket(AF_INET, SOCK_STREAM, 0); - assert(sockfd >=0 ); - server = gethostbyname("localhost"); - assert(server); + int sockfd = socket (AF_INET, SOCK_STREAM, 0); + assert (sockfd >= 0); + server = gethostbyname ("localhost"); + assert (server); - bzero((char *) &serv_addr, sizeof(serv_addr)); + bzero (&serv_addr, sizeof serv_addr); serv_addr.sin_family = AF_INET; - bcopy((char *)server->h_addr, - (char *)&serv_addr.sin_addr.s_addr, - server->h_length); - serv_addr.sin_port = htons(portno); + bcopy (server->h_addr, &serv_addr.sin_addr.s_addr, server->h_length); + serv_addr.sin_port = htons (portno); - if (connect(sockfd,(struct sockaddr *) &serv_addr,sizeof(serv_addr)) < 0) - assert(0); + int rc = connect (sockfd, (struct sockaddr *) &serv_addr, sizeof serv_addr); + assert (rc == 0); int nodelay = 1; - int rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, - sizeof (int)); - assert(rc == 0); - + rc = setsockopt (sockfd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay, + sizeof nodelay); + assert (rc == 0); return sockfd; } -void tcp_client_write(int sockfd, const void *buf, int buf_len){ - assert(buf); - int n = write(sockfd, buf, buf_len); - assert(n >= 0); +void tcp_client_write (int sockfd, const void *buf, int buf_len) +{ + assert (buf); + int n = write (sockfd, buf, buf_len); + assert (n >= 0); } -void tcp_client_read(int sockfd){ +void tcp_client_read (int sockfd) +{ struct timeval tm; tm.tv_sec = 1; tm.tv_usec = 0; fd_set r; - int sr; - char buffer[16]; + char buffer [16]; - FD_ZERO(&r); - FD_SET(sockfd, &r); + FD_ZERO (&r); + FD_SET (sockfd, &r); - if ((sr = select(sockfd + 1, &r, NULL, NULL, &tm)) <= 0) - { - assert(0); - } + int sr = select (sockfd + 1, &r, NULL, NULL, &tm); + assert (sr > 0); - int n = read(sockfd, buffer, 16); - assert(n>0); - assert(memcmp(buffer, test_str, strlen(test_str)) == 0); + int n = read (sockfd, buffer, 16); + assert (n > 0); + assert (memcmp (buffer, test_str, strlen (test_str)) == 0); } - -void tcp_client_close(int sockfd){ - close(sockfd); +void tcp_client_close (int sockfd) +{ + close (sockfd); } - -int main(){ +int main () +{ fprintf (stderr, "test_raw_sock running...\n"); zmq_msg_t message; @@ -112,56 +106,49 @@ int main(){ void *ctx = zmq_init (1); assert (ctx); - int raw_sock = 1, rc = 0; void *sb = zmq_socket (ctx, ZMQ_ROUTER); assert (sb); - rc = zmq_setsockopt( sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof(int)); - assert(rc == 0); + + int raw_sock = 1; + int rc = zmq_setsockopt (sb, ZMQ_ROUTER_RAW_SOCK, &raw_sock, sizeof raw_sock); + assert (rc == 0); rc = zmq_bind (sb, "tcp://127.0.0.1:5555"); assert (rc == 0); - int sock_fd = tcp_client(); - assert(sock_fd >= 0); + int sock_fd = tcp_client (); + assert (sock_fd >= 0); // =================== - zmq_msg_init(&message); - zmq_msg_init(&id); + zmq_msg_init (&message); + zmq_msg_init (&id); assert (rc == 0); zmq_pollitem_t items [] = { { sb, 0, ZMQ_POLLIN, 0 }, }; - tcp_client_write(sock_fd, test_str, strlen(test_str)); + tcp_client_write (sock_fd, test_str, strlen (test_str)); zmq_poll (items, 1, 500); - if (items [0].revents & ZMQ_POLLIN) { - int n = zmq_msg_recv (&id, sb, 0); - assert(n > 0); - n = zmq_msg_recv (&message, sb, 0); - assert(n > 0); - assert(memcmp(zmq_msg_data (&message), test_str, strlen(test_str)) == 0); - }else{ - assert(0); - } + assert (items [0].revents & ZMQ_POLLIN); + int n = zmq_msg_recv (&id, sb, 0); + assert (n > 0); + n = zmq_msg_recv (&message, sb, 0); + assert (n > 0); + assert (memcmp (zmq_msg_data (&message), test_str, strlen (test_str)) == 0); zmq_msg_send (&id, sb, ZMQ_SNDMORE); - zmq_msg_send (&message, sb, ZMQ_SNDMORE);// SNDMORE option is ignored + zmq_msg_send (&message, sb, ZMQ_SNDMORE); // SNDMORE option is ignored - tcp_client_read(sock_fd); - tcp_client_close(sock_fd); + tcp_client_read (sock_fd); + tcp_client_close (sock_fd); - zmq_msg_close(&id); - zmq_msg_close(&message); + zmq_msg_close (&id); + zmq_msg_close (&message); - - zmq_close(sb); - zmq_term(ctx); + zmq_close (sb); + zmq_term (ctx); fprintf (stderr, "test_raw_sock PASSED.\n"); return 0; } - - - -