/* SPDX-License-Identifier: MPL-2.0 */

#include "testutil.hpp"
#include "testutil_unity.hpp"

SETUP_TEARDOWN_TESTCONTEXT

void test ()
{
    //  Create a publisher
    void *pub = test_context_socket (ZMQ_XPUB);

    int hwm = 2000;
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDHWM, &hwm, 4));

    TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));

    //  set pub socket options
    int wait = 1;
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_XPUB_NODROP, &wait, 4));

    //  Create a subscriber
    void *sub = test_context_socket (ZMQ_SUB);
    TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));

    //  Subscribe for all messages.
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "", 0));

    //  we must wait for the subscription to be processed here, otherwise some
    //  or all published messages might be lost
    recv_string_expect_success (pub, "\1", 0);

    int hwmlimit = hwm - 1;
    int send_count = 0;

    //  Send an empty message
    for (int i = 0; i < hwmlimit; i++) {
        TEST_ASSERT_SUCCESS_ERRNO (zmq_send (pub, NULL, 0, 0));
        send_count++;
    }

    int recv_count = 0;
    do {
        //  Receive the message in the subscriber
        int rc = zmq_recv (sub, NULL, 0, 0);
        if (rc == -1) {
            TEST_ASSERT_EQUAL_INT (EAGAIN, errno);
            break;
        }
        TEST_ASSERT_EQUAL_INT (0, rc);
        recv_count++;

        if (recv_count == 1) {
            const int sub_rcvtimeo = 250;
            TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
              sub, ZMQ_RCVTIMEO, &sub_rcvtimeo, sizeof (sub_rcvtimeo)));
        }

    } while (true);

    TEST_ASSERT_EQUAL_INT (send_count, recv_count);

    //  Now test real blocking behavior
    //  Set a timeout, default is infinite
    int timeout = 0;
    TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_SNDTIMEO, &timeout, 4));

    send_count = 0;
    recv_count = 0;
    hwmlimit = hwm;

    //  Send an empty message until we get an error, which must be EAGAIN
    while (zmq_send (pub, "", 0, 0) == 0)
        send_count++;
    TEST_ASSERT_EQUAL_INT (EAGAIN, errno);

    if (send_count > 0) {
        //  Receive first message with blocking
        TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (sub, NULL, 0, 0));
        recv_count++;

        while (zmq_recv (sub, NULL, 0, ZMQ_DONTWAIT) == 0)
            recv_count++;
    }

    TEST_ASSERT_EQUAL_INT (send_count, recv_count);

    //  Clean up.
    test_context_socket_close (pub);
    test_context_socket_close (sub);
}

int main ()
{
    setup_test_environment ();
    UNITY_BEGIN ();
    RUN_TEST (test);
    return UNITY_END ();
}