0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 07:31:03 +08:00

Merge pull request #645 from hintjens/master

Updated libzmq to match RFCs
This commit is contained in:
Martin Hurton 2013-09-06 00:54:51 -07:00
commit 498dc37988
23 changed files with 176 additions and 281 deletions

1
.gitignore vendored
View File

@ -23,6 +23,7 @@ autom4te.cache
tools/curve_keygen.o
tools/curve_keygen
tests/test_term_endpoint
tests/test_system
tests/test_monitor
tests/test_last_endpoint
tests/test_pair_inproc

View File

@ -578,6 +578,7 @@ endif()
enable_testing()
set(tests
test_system
test_connect_delay
test_connect_resolve
test_ctx_options
@ -612,7 +613,6 @@ if(NOT WIN32)
list(APPEND tests
test_monitor
test_pair_ipc
test_raw_sock
test_reqrep_ipc
test_stream)
endif()

View File

@ -50,18 +50,18 @@ zmq::curve_client_t::~curve_client_t ()
{
}
int zmq::curve_client_t::next_handshake_message (msg_t *msg_)
int zmq::curve_client_t::next_handshake_command (msg_t *msg_)
{
int rc = 0;
switch (state) {
case send_hello:
rc = hello_msg (msg_);
rc = produce_hello (msg_);
if (rc == 0)
state = expect_welcome;
break;
case send_initiate:
rc = initiate_msg (msg_);
rc = produce_initiate (msg_);
if (rc == 0)
state = expect_ready;
break;
@ -72,7 +72,7 @@ int zmq::curve_client_t::next_handshake_message (msg_t *msg_)
return rc;
}
int zmq::curve_client_t::process_handshake_message (msg_t *msg_)
int zmq::curve_client_t::process_handshake_command (msg_t *msg_)
{
int rc = 0;
@ -138,7 +138,7 @@ int zmq::curve_client_t::encode (msg_t *msg_)
uint8_t *message = static_cast <uint8_t *> (msg_->data ());
memcpy (message, "MESSAGE\0", 8);
memcpy (message, "\x07MESSAGE", 8);
memcpy (message + 8, &cn_nonce, 8);
memcpy (message + 16, message_box + crypto_box_BOXZEROBYTES,
mlen - crypto_box_BOXZEROBYTES);
@ -161,7 +161,7 @@ int zmq::curve_client_t::decode (msg_t *msg_)
}
const uint8_t *message = static_cast <uint8_t *> (msg_->data ());
if (memcmp (message, "MESSAGE\0", 8)) {
if (memcmp (message, "\x07MESSAGE", 8)) {
errno = EPROTO;
return -1;
}
@ -213,7 +213,7 @@ bool zmq::curve_client_t::is_handshake_complete () const
return state == connected;
}
int zmq::curve_client_t::hello_msg (msg_t *msg_)
int zmq::curve_client_t::produce_hello (msg_t *msg_)
{
uint8_t hello_nonce [crypto_box_NONCEBYTES];
uint8_t hello_plaintext [crypto_box_ZEROBYTES + 64];
@ -235,7 +235,7 @@ int zmq::curve_client_t::hello_msg (msg_t *msg_)
errno_assert (rc == 0);
uint8_t *hello = static_cast <uint8_t *> (msg_->data ());
memcpy (hello, "HELLO\0", 6);
memcpy (hello, "\x05HELLO", 6);
// CurveZMQ major and minor version numbers
memcpy (hello + 6, "\1\0", 2);
// Anti-amplification padding
@ -260,7 +260,7 @@ int zmq::curve_client_t::process_welcome (msg_t *msg_)
}
const uint8_t * welcome = static_cast <uint8_t *> (msg_->data ());
if (memcmp (welcome, "WELCOME\0", 8)) {
if (memcmp (welcome, "\x07WELCOME", 8)) {
errno = EPROTO;
return -1;
}
@ -294,7 +294,7 @@ int zmq::curve_client_t::process_welcome (msg_t *msg_)
return 0;
}
int zmq::curve_client_t::initiate_msg (msg_t *msg_)
int zmq::curve_client_t::produce_initiate (msg_t *msg_)
{
uint8_t vouch_nonce [crypto_box_NONCEBYTES];
uint8_t vouch_plaintext [crypto_box_ZEROBYTES + 32];
@ -351,7 +351,7 @@ int zmq::curve_client_t::initiate_msg (msg_t *msg_)
uint8_t *initiate = static_cast <uint8_t *> (msg_->data ());
memcpy (initiate, "INITIATE\0", 9);
memcpy (initiate, "\x08INITIATE", 9);
// Cookie provided by the server in the WELCOME command
memcpy (initiate + 9, cn_cookie, 96);
// Short nonce, prefixed by "CurveZMQINITIATE"
@ -373,7 +373,7 @@ int zmq::curve_client_t::process_ready (msg_t *msg_)
}
const uint8_t *ready = static_cast <uint8_t *> (msg_->data ());
if (memcmp (ready, "READY\0", 6)) {
if (memcmp (ready, "\x05READY", 6)) {
errno = EPROTO;
return -1;
}

View File

@ -50,8 +50,8 @@ namespace zmq
virtual ~curve_client_t ();
// mechanism implementation
virtual int next_handshake_message (msg_t *msg_);
virtual int process_handshake_message (msg_t *msg_);
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
virtual bool is_handshake_complete () const;
@ -96,9 +96,9 @@ namespace zmq
// Nonce
uint64_t cn_nonce;
int hello_msg (msg_t *msg_);
int produce_hello (msg_t *msg_);
int process_welcome (msg_t *msg_);
int initiate_msg (msg_t *msg_);
int produce_initiate (msg_t *msg_);
int process_ready (msg_t *msg_);
};

View File

@ -54,18 +54,18 @@ zmq::curve_server_t::~curve_server_t ()
{
}
int zmq::curve_server_t::next_handshake_message (msg_t *msg_)
int zmq::curve_server_t::next_handshake_command (msg_t *msg_)
{
int rc = 0;
switch (state) {
case send_welcome:
rc = welcome_msg (msg_);
rc = produce_welcome (msg_);
if (rc == 0)
state = expect_initiate;
break;
case send_ready:
rc = ready_msg (msg_);
rc = produce_ready (msg_);
if (rc == 0)
state = connected;
break;
@ -77,7 +77,7 @@ int zmq::curve_server_t::next_handshake_message (msg_t *msg_)
return rc;
}
int zmq::curve_server_t::process_handshake_message (msg_t *msg_)
int zmq::curve_server_t::process_handshake_command (msg_t *msg_)
{
int rc = 0;
@ -143,7 +143,7 @@ int zmq::curve_server_t::encode (msg_t *msg_)
uint8_t *message = static_cast <uint8_t *> (msg_->data ());
memcpy (message, "MESSAGE\0", 8);
memcpy (message, "\x07MESSAGE", 8);
memcpy (message + 8, &cn_nonce, 8);
memcpy (message + 16, message_box + crypto_box_BOXZEROBYTES,
mlen - crypto_box_BOXZEROBYTES);
@ -166,7 +166,7 @@ int zmq::curve_server_t::decode (msg_t *msg_)
}
const uint8_t *message = static_cast <uint8_t *> (msg_->data ());
if (memcmp (message, "MESSAGE\0", 8)) {
if (memcmp (message, "\x07MESSAGE", 8)) {
errno = EPROTO;
return -1;
}
@ -238,7 +238,7 @@ int zmq::curve_server_t::process_hello (msg_t *msg_)
}
const uint8_t * const hello = static_cast <uint8_t *> (msg_->data ());
if (memcmp (hello, "HELLO\0", 6)) {
if (memcmp (hello, "\x05HELLO", 6)) {
errno = EPROTO;
return -1;
}
@ -276,7 +276,7 @@ int zmq::curve_server_t::process_hello (msg_t *msg_)
return rc;
}
int zmq::curve_server_t::welcome_msg (msg_t *msg_)
int zmq::curve_server_t::produce_welcome (msg_t *msg_)
{
uint8_t cookie_nonce [crypto_secretbox_NONCEBYTES];
uint8_t cookie_plaintext [crypto_secretbox_ZEROBYTES + 64];
@ -329,7 +329,7 @@ int zmq::curve_server_t::welcome_msg (msg_t *msg_)
errno_assert (rc == 0);
uint8_t * const welcome = static_cast <uint8_t *> (msg_->data ());
memcpy (welcome, "WELCOME\0", 8);
memcpy (welcome, "\x07WELCOME", 8);
memcpy (welcome + 8, welcome_nonce + 8, 16);
memcpy (welcome + 24, welcome_ciphertext + crypto_box_BOXZEROBYTES, 144);
@ -344,7 +344,7 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
}
const uint8_t *initiate = static_cast <uint8_t *> (msg_->data ());
if (memcmp (initiate, "INITIATE\0", 9)) {
if (memcmp (initiate, "\x08INITIATE", 9)) {
errno = EPROTO;
return -1;
}
@ -447,7 +447,7 @@ int zmq::curve_server_t::process_initiate (msg_t *msg_)
clen - crypto_box_ZEROBYTES - 96);
}
int zmq::curve_server_t::ready_msg (msg_t *msg_)
int zmq::curve_server_t::produce_ready (msg_t *msg_)
{
uint8_t ready_nonce [crypto_box_NONCEBYTES];
uint8_t ready_plaintext [crypto_box_ZEROBYTES + 256];
@ -482,7 +482,7 @@ int zmq::curve_server_t::ready_msg (msg_t *msg_)
uint8_t *ready = static_cast <uint8_t *> (msg_->data ());
memcpy (ready, "READY\0", 6);
memcpy (ready, "\x05READY", 6);
// Short nonce, prefixed by "CurveZMQREADY---"
memcpy (ready + 6, &cn_nonce, 8);
// Box [metadata](S'->C')

View File

@ -55,8 +55,8 @@ namespace zmq
virtual ~curve_server_t ();
// mechanism implementation
virtual int next_handshake_message (msg_t *msg_);
virtual int process_handshake_message (msg_t *msg_);
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int encode (msg_t *msg_);
virtual int decode (msg_t *msg_);
virtual int zap_msg_available ();
@ -104,9 +104,9 @@ namespace zmq
uint8_t cn_precom [crypto_box_BEFORENMBYTES];
int process_hello (msg_t *msg_);
int welcome_msg (msg_t *msg_);
int produce_welcome (msg_t *msg_);
int process_initiate (msg_t *msg_);
int ready_msg (msg_t *msg_);
int produce_ready (msg_t *msg_);
void send_zap_request (const uint8_t *key);
int receive_and_process_zap_reply ();

View File

@ -40,11 +40,11 @@ namespace zmq
virtual ~mechanism_t ();
// Prepare next handshake message that is to be sent to the peer.
virtual int next_handshake_message (msg_t *msg_) = 0;
// Prepare next handshake command that is to be sent to the peer.
virtual int next_handshake_command (msg_t *msg_) = 0;
// Process the handshake message received from the peer.
virtual int process_handshake_message (msg_t *msg_) = 0;
// Process the handshake command received from the peer.
virtual int process_handshake_command (msg_t *msg_) = 0;
virtual int encode (msg_t *msg_) { return 0; }

View File

@ -47,7 +47,8 @@ namespace zmq
// Message flags.
enum
{
more = 1,
more = 1, // Followed by more parts
command = 2, // Command frame (see ZMTP spec)
identity = 64,
shared = 128
};

View File

@ -53,7 +53,7 @@ zmq::null_mechanism_t::~null_mechanism_t ()
{
}
int zmq::null_mechanism_t::next_handshake_message (msg_t *msg_)
int zmq::null_mechanism_t::next_handshake_command (msg_t *msg_)
{
if (ready_command_sent) {
errno = EAGAIN;
@ -78,7 +78,7 @@ int zmq::null_mechanism_t::next_handshake_message (msg_t *msg_)
unsigned char *ptr = command_buffer;
// Add mechanism string
memcpy (ptr, "READY\0", 6);
memcpy (ptr, "\5READY", 6);
ptr += 6;
// Add socket type property
@ -104,7 +104,7 @@ int zmq::null_mechanism_t::next_handshake_message (msg_t *msg_)
return 0;
}
int zmq::null_mechanism_t::process_handshake_message (msg_t *msg_)
int zmq::null_mechanism_t::process_handshake_command (msg_t *msg_)
{
if (ready_command_received) {
errno = EPROTO;
@ -115,7 +115,7 @@ int zmq::null_mechanism_t::process_handshake_message (msg_t *msg_)
static_cast <unsigned char *> (msg_->data ());
size_t bytes_left = msg_->size ();
if (bytes_left < 6 || memcmp (ptr, "READY\0", 6)) {
if (bytes_left < 6 || memcmp (ptr, "\5READY", 6)) {
errno = EPROTO;
return -1;
}

View File

@ -39,8 +39,8 @@ namespace zmq
virtual ~null_mechanism_t ();
// mechanism implementation
virtual int next_handshake_message (msg_t *msg_);
virtual int process_handshake_message (msg_t *msg_);
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int zap_msg_available ();
virtual bool is_handshake_complete () const;

View File

@ -46,28 +46,28 @@ zmq::plain_mechanism_t::~plain_mechanism_t ()
{
}
int zmq::plain_mechanism_t::next_handshake_message (msg_t *msg_)
int zmq::plain_mechanism_t::next_handshake_command (msg_t *msg_)
{
int rc = 0;
switch (state) {
case sending_hello:
rc = hello_command (msg_);
rc = produce_hello (msg_);
if (rc == 0)
state = waiting_for_welcome;
break;
case sending_welcome:
rc = welcome_command (msg_);
rc = produce_welcome (msg_);
if (rc == 0)
state = waiting_for_initiate;
break;
case sending_initiate:
rc = initiate_command (msg_);
rc = produce_initiate (msg_);
if (rc == 0)
state = waiting_for_ready;
break;
case sending_ready:
rc = ready_command (msg_);
rc = produce_ready (msg_);
if (rc == 0)
state = ready;
break;
@ -78,28 +78,28 @@ int zmq::plain_mechanism_t::next_handshake_message (msg_t *msg_)
return rc;
}
int zmq::plain_mechanism_t::process_handshake_message (msg_t *msg_)
int zmq::plain_mechanism_t::process_handshake_command (msg_t *msg_)
{
int rc = 0;
switch (state) {
case waiting_for_hello:
rc = process_hello_command (msg_);
rc = process_hello (msg_);
if (rc == 0)
state = expecting_zap_reply? waiting_for_zap_reply: sending_welcome;
break;
case waiting_for_welcome:
rc = process_welcome_command (msg_);
rc = process_welcome (msg_);
if (rc == 0)
state = sending_initiate;
break;
case waiting_for_initiate:
rc = process_initiate_command (msg_);
rc = process_initiate (msg_);
if (rc == 0)
state = sending_ready;
break;
case waiting_for_ready:
rc = process_ready_command (msg_);
rc = process_ready (msg_);
if (rc == 0)
state = ready;
break;
@ -134,7 +134,7 @@ int zmq::plain_mechanism_t::zap_msg_available ()
return rc;
}
int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const
int zmq::plain_mechanism_t::produce_hello (msg_t *msg_) const
{
const std::string username = options.plain_username;
zmq_assert (username.length () < 256);
@ -142,15 +142,15 @@ int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const
const std::string password = options.plain_password;
zmq_assert (password.length () < 256);
const size_t command_size = 8 + 1 + username.length ()
const size_t command_size = 6 + 1 + username.length ()
+ 1 + password.length ();
const int rc = msg_->init_size (command_size);
errno_assert (rc == 0);
unsigned char *ptr = static_cast <unsigned char *> (msg_->data ());
memcpy (ptr, "HELLO ", 8);
ptr += 8;
memcpy (ptr, "\x05HELLO", 6);
ptr += 6;
*ptr++ = static_cast <unsigned char> (username.length ());
memcpy (ptr, username.c_str (), username.length ());
@ -164,17 +164,17 @@ int zmq::plain_mechanism_t::hello_command (msg_t *msg_) const
}
int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_)
int zmq::plain_mechanism_t::process_hello (msg_t *msg_)
{
const unsigned char *ptr = static_cast <unsigned char *> (msg_->data ());
size_t bytes_left = msg_->size ();
if (bytes_left < 8 || memcmp (ptr, "HELLO ", 8)) {
if (bytes_left < 6 || memcmp (ptr, "\x05HELLO", 6)) {
errno = EPROTO;
return -1;
}
ptr += 8;
bytes_left -= 8;
ptr += 6;
bytes_left -= 6;
if (bytes_left < 1) {
errno = EPROTO;
@ -226,27 +226,27 @@ int zmq::plain_mechanism_t::process_hello_command (msg_t *msg_)
return 0;
}
int zmq::plain_mechanism_t::welcome_command (msg_t *msg_) const
int zmq::plain_mechanism_t::produce_welcome (msg_t *msg_) const
{
const int rc = msg_->init_size (8);
errno_assert (rc == 0);
memcpy (msg_->data (), "WELCOME ", 8);
memcpy (msg_->data (), "\x07WELCOME", 8);
return 0;
}
int zmq::plain_mechanism_t::process_welcome_command (msg_t *msg_)
int zmq::plain_mechanism_t::process_welcome (msg_t *msg_)
{
const unsigned char *ptr = static_cast <unsigned char *> (msg_->data ());
size_t bytes_left = msg_->size ();
if (bytes_left != 8 || memcmp (ptr, "WELCOME ", 8)) {
if (bytes_left != 8 || memcmp (ptr, "\x07WELCOME", 8)) {
errno = EPROTO;
return -1;
}
return 0;
}
int zmq::plain_mechanism_t::initiate_command (msg_t *msg_) const
int zmq::plain_mechanism_t::produce_initiate (msg_t *msg_) const
{
unsigned char * const command_buffer = (unsigned char *) malloc (512);
alloc_assert (command_buffer);
@ -254,8 +254,8 @@ int zmq::plain_mechanism_t::initiate_command (msg_t *msg_) const
unsigned char *ptr = command_buffer;
// Add mechanism string
memcpy (ptr, "INITIATE", 8);
ptr += 8;
memcpy (ptr, "\x08INITIATE", 9);
ptr += 9;
// Add socket type property
const char *socket_type = socket_type_string (options.type);
@ -278,19 +278,21 @@ int zmq::plain_mechanism_t::initiate_command (msg_t *msg_) const
return 0;
}
int zmq::plain_mechanism_t::process_initiate_command (msg_t *msg_)
int zmq::plain_mechanism_t::process_initiate (msg_t *msg_)
{
const unsigned char *ptr = static_cast <unsigned char *> (msg_->data ());
size_t bytes_left = msg_->size ();
if (bytes_left < 8 || memcmp (ptr, "INITIATE", 8)) {
if (bytes_left < 9 || memcmp (ptr, "\x08INITIATE", 9)) {
errno = EPROTO;
return -1;
}
return parse_metadata (ptr + 8, bytes_left - 8);
ptr += 9;
bytes_left -= 9;
return parse_metadata (ptr, bytes_left);
}
int zmq::plain_mechanism_t::ready_command (msg_t *msg_) const
int zmq::plain_mechanism_t::produce_ready (msg_t *msg_) const
{
unsigned char * const command_buffer = (unsigned char *) malloc (512);
alloc_assert (command_buffer);
@ -298,7 +300,7 @@ int zmq::plain_mechanism_t::ready_command (msg_t *msg_) const
unsigned char *ptr = command_buffer;
// Add command name
memcpy (ptr, "READY\0", 6);
memcpy (ptr, "\x05READY", 6);
ptr += 6;
// Add socket type property
@ -322,12 +324,12 @@ int zmq::plain_mechanism_t::ready_command (msg_t *msg_) const
return 0;
}
int zmq::plain_mechanism_t::process_ready_command (msg_t *msg_)
int zmq::plain_mechanism_t::process_ready (msg_t *msg_)
{
const unsigned char *ptr = static_cast <unsigned char *> (msg_->data ());
size_t bytes_left = msg_->size ();
if (bytes_left < 6 || memcmp (ptr, "READY\0", 6)) {
if (bytes_left < 6 || memcmp (ptr, "\x05READY", 6)) {
errno = EPROTO;
return -1;
}

View File

@ -39,8 +39,8 @@ namespace zmq
virtual ~plain_mechanism_t ();
// mechanism implementation
virtual int next_handshake_message (msg_t *msg_);
virtual int process_handshake_message (msg_t *msg_);
virtual int next_handshake_command (msg_t *msg_);
virtual int process_handshake_command (msg_t *msg_);
virtual int zap_msg_available ();
virtual bool is_handshake_complete () const;
@ -68,15 +68,15 @@ namespace zmq
state_t state;
int hello_command (msg_t *msg_) const;
int welcome_command (msg_t *msg_) const;
int initiate_command (msg_t *msg_) const;
int ready_command (msg_t *msg_) const;
int produce_hello (msg_t *msg_) const;
int produce_welcome (msg_t *msg_) const;
int produce_initiate (msg_t *msg_) const;
int produce_ready (msg_t *msg_) const;
int process_hello_command (msg_t *msg_);
int process_welcome_command (msg_t *msg);
int process_ready_command (msg_t *msg_);
int process_initiate_command (msg_t *msg_);
int process_hello (msg_t *msg_);
int process_welcome (msg_t *msg);
int process_ready (msg_t *msg_);
int process_initiate (msg_t *msg_);
void send_zap_request (const std::string &username,
const std::string &password);

View File

@ -395,7 +395,6 @@ bool zmq::stream_engine_t::handshake ()
{
zmq_assert (handshaking);
zmq_assert (greeting_bytes_read < greeting_size);
// Receive the greeting.
while (greeting_bytes_read < greeting_size) {
const int n = read (greeting_recv + greeting_bytes_read,
@ -492,8 +491,8 @@ bool zmq::stream_engine_t::handshake ()
insize = greeting_bytes_read;
// To allow for interoperability with peers that do not forward
// their subscriptions, we inject a phony subscription
// message into the incomming message stream.
// their subscriptions, we inject a phantom subscription message
// message into the incoming message stream.
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
subscription_required = true;
}
@ -550,9 +549,8 @@ bool zmq::stream_engine_t::handshake ()
error ();
return false;
}
read_msg = &stream_engine_t::next_handshake_message;
write_msg = &stream_engine_t::process_handshake_message;
read_msg = &stream_engine_t::next_handshake_command;
write_msg = &stream_engine_t::process_handshake_command;
}
// Start polling for output if necessary.
@ -598,12 +596,13 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_)
return 0;
}
int zmq::stream_engine_t::next_handshake_message (msg_t *msg_)
int zmq::stream_engine_t::next_handshake_command (msg_t *msg_)
{
zmq_assert (mechanism != NULL);
const int rc = mechanism->next_handshake_message (msg_);
const int rc = mechanism->next_handshake_command (msg_);
if (rc == 0) {
msg_->set_flags (msg_t::command);
if (mechanism->is_handshake_complete ())
mechanism_ready ();
}
@ -611,11 +610,10 @@ int zmq::stream_engine_t::next_handshake_message (msg_t *msg_)
return rc;
}
int zmq::stream_engine_t::process_handshake_message (msg_t *msg_)
int zmq::stream_engine_t::process_handshake_command (msg_t *msg_)
{
zmq_assert (mechanism != NULL);
const int rc = mechanism->process_handshake_message (msg_);
const int rc = mechanism->process_handshake_command (msg_);
if (rc == 0) {
if (mechanism->is_handshake_complete ())
mechanism_ready ();

View File

@ -96,8 +96,8 @@ namespace zmq
int read_identity (msg_t *msg_);
int write_identity (msg_t *msg_);
int next_handshake_message (msg_t *msg);
int process_handshake_message (msg_t *msg);
int next_handshake_command (msg_t *msg);
int process_handshake_command (msg_t *msg);
int pull_msg_from_session (msg_t *msg_);
int push_msg_to_session (msg_t *msg);
@ -171,8 +171,8 @@ namespace zmq
bool io_error;
// Indicates whether the engine is to inject a phony
// subscription message into the incomming stream.
// Indicates whether the engine is to inject a phantom
// subscription message into the incoming stream.
// Needed to support old peers.
bool subscription_required;

View File

@ -54,6 +54,8 @@ int zmq::v2_decoder_t::flags_ready ()
msg_flags = 0;
if (tmpbuf [0] & v2_protocol_t::more_flag)
msg_flags |= msg_t::more;
if (tmpbuf [0] & v2_protocol_t::command_flag)
msg_flags |= msg_t::command;
// The payload length is either one or eight bytes,
// depending on whether the 'large' bit is set.

View File

@ -42,6 +42,8 @@ void zmq::v2_encoder_t::message_ready ()
protocol_flags |= v2_protocol_t::more_flag;
if (in_progress->size () > 255)
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress->flags () & msg_t::command)
protocol_flags |= v2_protocol_t::command_flag;
// Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger

View File

@ -30,7 +30,8 @@ namespace zmq
enum
{
more_flag = 1,
large_flag = 2
large_flag = 2,
command_flag = 4
};
};
}

View File

@ -3,7 +3,8 @@ INCLUDES = -I$(top_builddir)/include \
LDADD = $(top_builddir)/src/libzmq.la
noinst_PROGRAMS = test_pair_inproc \
noinst_PROGRAMS = test_system \
test_pair_inproc \
test_pair_tcp \
test_reqrep_inproc \
test_reqrep_tcp \
@ -44,6 +45,7 @@ noinst_PROGRAMS += test_shutdown_stress \
test_fork
endif
test_system_SOURCES = test_system.cpp
test_pair_inproc_SOURCES = test_pair_inproc.cpp testutil.hpp
test_pair_tcp_SOURCES = test_pair_tcp.cpp testutil.hpp
test_reqrep_inproc_SOURCES = test_reqrep_inproc.cpp testutil.hpp
@ -84,10 +86,6 @@ test_timeo_SOURCES = test_timeo.cpp
test_fork_SOURCES = test_fork.cpp
endif
# Deprecated test cases
noinst_PROGRAMS += test_raw_sock
test_raw_sock_SOURCES = test_raw_sock.cpp
# Run the test cases
TESTS = $(noinst_PROGRAMS)
XFAIL_TESTS = test_linger

View File

@ -1,166 +0,0 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <string.h>
#include "testutil.hpp"
// ZMTP protocol greeting structure
typedef unsigned char byte;
typedef struct {
byte signature [10]; // 0xFF 8*0x00 0x7F
byte version [2]; // 0x03 0x00 for ZMTP/3.0
byte mechanism [20]; // "NULL"
byte as_server;
byte filler [31];
} zmtp_greeting_t;
#define ZMTP_DEALER 5 // Socket type constants
#define ZMTP_ROUTER 6
// This is a greeting matching what 0MQ will send us; note the
// 8-byte size is set to 1 for backwards compatibility
static zmtp_greeting_t greeting
= { { 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, {3, 0}, { 'N', 'U', 'L', 'L'} };
int main (void)
{
setup_test_environment();
int rc;
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
// We'll be using this socket in raw mode
void *router = zmq_socket (ctx, ZMQ_ROUTER);
assert (router);
int on = 1;
rc = zmq_setsockopt (router, ZMQ_ROUTER_RAW, &on, sizeof (on));
assert (rc == 0);
int zero = 0;
rc = zmq_setsockopt (router, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (router, "tcp://*:5555");
assert (rc == 0);
// We'll be using this socket as the other peer
void *dealer = zmq_socket (ctx, ZMQ_DEALER);
assert (dealer);
rc = zmq_setsockopt (dealer, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_connect (dealer, "tcp://localhost:5555");
// Send a message on the dealer socket
rc = zmq_send (dealer, "Hello", 5, 0);
assert (rc == 5);
// First frame is identity
zmq_msg_t identity;
rc = zmq_msg_init (&identity);
assert (rc == 0);
rc = zmq_msg_recv (&identity, router, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
// Second frame is greeting signature
byte buffer [255];
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 10);
assert (memcmp (buffer, greeting.signature, 10) == 0);
// Send our own protocol greeting
rc = zmq_msg_send (&identity, router, ZMQ_SNDMORE);
assert (rc > 0);
rc = zmq_send (router, &greeting, sizeof (greeting), 0);
assert (rc == sizeof (greeting));
// Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection
rc = zmq_msg_recv (&identity, router, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
// Second frame contains the rest of greeting along with
// the Ready command
rc = zmq_recv (router, buffer, 255, 0);
assert (rc == 97);
// First two bytes are major and minor version numbers.
assert (buffer [0] == 3); // ZMTP/3.0
assert (buffer [1] == 0);
// Mechanism is "NULL"
assert (memcmp (buffer + 2, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 22) == 0);
assert (memcmp (buffer + 54, "\0\51READY\0", 8) == 0);
assert (memcmp (buffer + 62, "\13Socket-Type\0\0\0\6DEALER", 22) == 0);
assert (memcmp (buffer + 84, "\10Identity\0\0\0\0", 13) == 0);
// Announce we are ready
memcpy (buffer, "\0\51READY\0", 8);
memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER", 22);
memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);
// Send Ready command
rc = zmq_msg_send (&identity, router, ZMQ_SNDMORE);
assert (rc > 0);
rc = zmq_send (router, buffer, 43, 0);
assert (rc == 43);
// Now we expect the data from the DEALER socket
// First frame is, again, the identity of the connection
rc = zmq_msg_recv (&identity, router, 0);
assert (rc > 0);
assert (zmq_msg_more (&identity));
// Third frame contains Hello message from DEALER
rc = zmq_recv (router, buffer, sizeof buffer, 0);
assert (rc == 7);
// Then we have a 5-byte message "Hello"
assert (buffer [0] == 0); // Flags = 0
assert (buffer [1] == 5); // Size = 5
assert (memcmp (buffer + 2, "Hello", 5) == 0);
// Send "World" back to DEALER
rc = zmq_msg_send (&identity, router, ZMQ_SNDMORE);
assert (rc > 0);
byte world [] = { 0, 5, 'W', 'o', 'r', 'l', 'd' };
rc = zmq_send (router, world, sizeof (world), 0);
assert (rc == sizeof (world));
// Expect response on DEALER socket
rc = zmq_recv (dealer, buffer, 255, 0);
assert (rc == 5);
assert (memcmp (buffer, "World", 5) == 0);
rc = zmq_close (dealer);
assert (rc == 0);
rc = zmq_close (router);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}

View File

@ -17,12 +17,12 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "platform.hpp"
#include <string.h>
#include <stdlib.h>
#include "testutil.hpp"
#include "../include/zmq_utils.h"
#include "../src/z85_codec.hpp"
#include "../src/platform.hpp"
// Test keys from the zmq_curve man page
static char client_public [] = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
@ -53,7 +53,7 @@ static void zap_handler (void *ctx)
int size = zmq_recv (zap, client_key, 32, 0);
assert (size == 32);
char client_key_text [40];
char client_key_text [41];
Z85_encode (client_key_text, client_key, 32);
assert (streq (version, "1.0"));

View File

@ -38,7 +38,7 @@ typedef struct {
// 8-byte size is set to 1 for backwards compatibility
static zmtp_greeting_t greeting
= { { 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, {3, 0}, { 'N', 'U', 'L', 'L'} };
= { { 0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F }, { 3, 0 }, { 'N', 'U', 'L', 'L'} };
static void
test_stream_to_dealer (void)
@ -98,7 +98,12 @@ test_stream_to_dealer (void)
// Second frame contains the rest of greeting along with
// the Ready command
rc = zmq_recv (stream, buffer, 255, 0);
int bytes_read = 0;
while (bytes_read < 97) {
rc = zmq_recv (stream, buffer + bytes_read, 255 - bytes_read, 0);
assert (rc >= 0);
bytes_read += rc;
}
assert (rc == 97);
// First two bytes are major and minor version numbers.
@ -106,13 +111,13 @@ test_stream_to_dealer (void)
assert (buffer [1] == 0);
// Mechanism is "NULL"
assert (memcmp (buffer + 2, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 22) == 0);
assert (memcmp (buffer + 54, "\0\51READY\0", 8) == 0);
assert (memcmp (buffer + 2, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0);
assert (memcmp (buffer + 54, "\4\51\5READY", 8) == 0);
assert (memcmp (buffer + 62, "\13Socket-Type\0\0\0\6DEALER", 22) == 0);
assert (memcmp (buffer + 84, "\10Identity\0\0\0\0", 13) == 0);
// Announce we are ready
memcpy (buffer, "\0\51READY\0", 8);
memcpy (buffer, "\4\51\5READY", 8);
memcpy (buffer + 8, "\13Socket-Type\0\0\0\6ROUTER", 22);
memcpy (buffer + 30, "\10Identity\0\0\0\0", 13);

51
tests/test_system.cpp Normal file
View File

@ -0,0 +1,51 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../include/zmq.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <assert.h>
// This test case stresses the system to shake out known configuration
// problems. We're not using libzmq here but direct system calls. Note
// that code may need wrapping to be properly portable.
int main (void)
{
// Check that we can create 1,000 sockets
int handle [1000];
int count;
for (count = 0; count < 1000; count++) {
handle [count] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (handle [count] == -1) {
printf ("W: Only able to create %d sockets on this box\n", count);
printf ("I: Tune your system to increase maximum allowed file handles\n");
#if defined (ZMQ_HAVE_OSX)
printf ("I: On OS/X, run 'ulimit -n 1200' in bash");
#elseif defined (ZMQ_HAVE_LINUX)
printf ("I: On Linux, run 'ulimit -n 1200' in bash");
#endif
return -1;
}
}
// Release the socket handles
while (count)
close (handle [count--]);
}

View File

@ -56,7 +56,7 @@ int main (void)
int rc = crypto_box_keypair (public_key, secret_key);
assert (rc == 0);
char encoded [40];
char encoded [41];
Z85_encode (encoded, public_key, 32);
puts ("\n== CURVE PUBLIC KEY ==");
puts (encoded);