diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index 6ea310c7..f68a909b 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -61,12 +61,12 @@ zmq::pgm_receiver_t::~pgm_receiver_t () delete decoder; } -int zmq::pgm_receiver_t::init (const char *network_) +int zmq::pgm_receiver_t::init (bool udp_encapsulation_, const char *network_) { decoder = new zmq_decoder_t; zmq_assert (decoder); - return pgm_socket.init (network_); + return pgm_socket.init (udp_encapsulation_, network_); } void zmq::pgm_receiver_t::plug (i_inout *inout_) diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 53d53402..05b27e24 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -44,7 +44,7 @@ namespace zmq const char *session_name_); ~pgm_receiver_t (); - int init (const char *network_); + int init (bool udp_encapsulation_, const char *network_); void reconnect (); // i_engine interface implementation. diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 9b1cef42..423865b0 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -59,9 +59,9 @@ zmq::pgm_sender_t::pgm_sender_t (io_thread_t *parent_, } -int zmq::pgm_sender_t::init (const char *network_) +int zmq::pgm_sender_t::init (bool udp_encapsulation_, const char *network_) { - return pgm_socket.init (network_); + return pgm_socket.init (udp_encapsulation_, network_); } void zmq::pgm_sender_t::plug (i_inout *inout_) @@ -157,7 +157,7 @@ void zmq::pgm_sender_t::out_event () // We can write all data or 0 which means rate limit reached. if (write_size - write_pos != nbytes && nbytes != 0) { - zmq_log (1, "write_size - write_pos %i, nbytes %i, %s(%i)", + zmq_log (2, "write_size - write_pos %i, nbytes %i, %s(%i)", (int)(write_size - write_pos), (int)nbytes, __FILE__, __LINE__); assert (false); } @@ -180,11 +180,9 @@ void zmq::pgm_sender_t::out_event () size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_, size_t size_, uint16_t offset_) { - zmq_log (1, "data_size %i, first message offset %i, %s(%i)\n", + zmq_log (4, "data_size %i, first message offset %i, %s(%i)\n", (int) size_, offset_, __FILE__, __LINE__); - std::cout << std::flush; - // Put offset information in the buffer. put_uint16 (data_, offset_); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 80be8d4d..8fdda6cc 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -42,7 +42,7 @@ namespace zmq const char *session_name_); ~pgm_sender_t (); - int init (const char *network_); + int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. void plug (struct i_inout *inout_); diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 315b43ea..8ceff6c7 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -68,24 +68,12 @@ zmq::pgm_socket_t::pgm_socket_t (bool receiver_, const options_t &options_) : } -int zmq::pgm_socket_t::init (const char *network_) +int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) { - // Check if we are encapsulating into UDP, natwork string has to - // start with udp:. - const char *network_ptr = network_; - - if (strlen (network_) >= 4 && network_ [0] == 'u' && - network_ [1] == 'd' && network_ [2] == 'p' && - network_ [3] == ':') { - - // Shift interface_ptr after ':'. - network_ptr += 4; - - udp_encapsulation = true; - } + udp_encapsulation = udp_encapsulation_; // Parse port number. - const char *port_delim = strchr (network_ptr, ':'); + const char *port_delim = strchr (network_, ':'); if (!port_delim) { errno = EINVAL; return -1; @@ -93,20 +81,13 @@ int zmq::pgm_socket_t::init (const char *network_) port_number = atoi (port_delim + 1); - // Store interface string. - if (port_delim <= network_ptr) { - errno = EINVAL; - return -1; - } - - if (port_delim - network_ptr >= (int) sizeof (network) - 1) { + if (port_delim - network_ >= (int) sizeof (network) - 1) { errno = EINVAL; return -1; } memset (network, '\0', sizeof (network)); - memcpy (network, network_ptr, port_delim - network_ptr); - + memcpy (network, network_, port_delim - network_); zmq_log (1, "parsed: network %s, port %i, udp encaps. %s, %s(%i)\n", network, port_number, udp_encapsulation ? "yes" : "no", @@ -364,7 +345,7 @@ int zmq::pgm_socket_t::open_transport (void) return -1; } - zmq_log (1, "Preallocated %i slices in TX window. %s(%i)\n", + zmq_log (2, "Preallocated %i slices in TX window. %s(%i)\n", to_preallocate, __FILE__, __LINE__); // Set interval of background SPM packets [us]. @@ -611,7 +592,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_) // Catch the rest of the errors. if (nbytes_rec <= 0) { - zmq_log (1, "received %i B, errno %i, %s(%i)", (int)nbytes_rec, + zmq_log (2, "received %i B, errno %i, %s(%i)", (int)nbytes_rec, errno, __FILE__, __LINE__); errno_assert (nbytes_rec > 0); } diff --git a/src/pgm_socket.hpp b/src/pgm_socket.hpp index 632288d9..fe4468b6 100644 --- a/src/pgm_socket.hpp +++ b/src/pgm_socket.hpp @@ -52,7 +52,7 @@ namespace zmq ~pgm_socket_t (); // Initialize PGM network structures (GSI, GSRs). - int init (const char *network_); + int init (bool udp_encapsulation_, const char *network_); // Open PGM transport. Parameters are the same as in constructor. int open_transport (void); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 88ba43f7..570be9e8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,7 +208,7 @@ int zmq::socket_base_t::bind (const char *addr_) } #if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm") { + if (addr_type == "pgm" || addr_type == "udp") { // In the case of PGM bind behaves the same like connect. return connect (addr_); } @@ -287,7 +287,12 @@ int zmq::socket_base_t::connect (const char *addr_) } #if defined ZMQ_HAVE_OPENPGM - if (addr_type == "pgm") { + if (addr_type == "pgm" || addr_type == "udp") { + + // For udp, pgm transport with udp encapsulation is used. + bool udp_encapsulation = false; + if (addr_type == "udp") + udp_encapsulation = true; switch (type) { @@ -298,7 +303,7 @@ int zmq::socket_base_t::connect (const char *addr_) new pgm_sender_t (choose_io_thread (options.affinity), options, session_name.c_str ()); - int rc = pgm_sender->init (addr_args.c_str ()); + int rc = pgm_sender->init (udp_encapsulation, addr_args.c_str ()); if (rc != 0) { delete pgm_sender; return -1; @@ -320,7 +325,7 @@ int zmq::socket_base_t::connect (const char *addr_) new pgm_receiver_t (choose_io_thread (options.affinity), options, session_name.c_str ()); - int rc = pgm_receiver->init (addr_args.c_str ()); + int rc = pgm_receiver->init (udp_encapsulation, addr_args.c_str ()); if (rc != 0) { delete pgm_receiver; return -1;