Ensure MQTT_CMD_SUBACK is received before calling mg_mqtt_pub in tests

This commit is contained in:
James Hilliard 2022-04-07 17:31:17 -06:00
parent a1ec179229
commit e4ab9a4130
3 changed files with 42 additions and 8 deletions

View File

@ -4812,7 +4812,7 @@ uint64_t mg_millis(void) {
#endif #endif
return ((uint64_t) ts.tv_sec * 1000 + (uint64_t) ts.tv_nsec / 1000000); return ((uint64_t) ts.tv_sec * 1000 + (uint64_t) ts.tv_nsec / 1000000);
#else #else
return time(NULL) * 1000; return (uint64_t) (time(NULL) * 1000);
#endif #endif
} }
#endif #endif

View File

@ -111,7 +111,7 @@ uint64_t mg_millis(void) {
#endif #endif
return ((uint64_t) ts.tv_sec * 1000 + (uint64_t) ts.tv_nsec / 1000000); return ((uint64_t) ts.tv_sec * 1000 + (uint64_t) ts.tv_nsec / 1000000);
#else #else
return time(NULL) * 1000; return (uint64_t) (time(NULL) * 1000);
#endif #endif
} }
#endif #endif

View File

@ -333,10 +333,25 @@ static void test_sntp(void) {
ASSERT(mg_sntp_parse(NULL, 0) == -1); ASSERT(mg_sntp_parse(NULL, 0) == -1);
} }
struct mqtt_data {
char *buf;
int subscribed;
int published;
};
static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) { static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
char *buf = (char *) fnd; struct mqtt_data *test_data = (struct mqtt_data *) fnd;
char *buf = test_data->buf;
if (ev == MG_EV_MQTT_OPEN) { if (ev == MG_EV_MQTT_OPEN) {
buf[0] = *(int *) evd == 0 ? 'X' : 'Y'; buf[0] = *(int *) evd == 0 ? 'X' : 'Y';
} else if (ev == MG_EV_MQTT_CMD) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
if (mm->cmd == MQTT_CMD_SUBACK) {
test_data->subscribed = 1;
}
if (mm->cmd == MQTT_CMD_PUBACK) {
test_data->published = 1;
}
} else if (ev == MG_EV_MQTT_MSG) { } else if (ev == MG_EV_MQTT_MSG) {
struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd; struct mg_mqtt_message *mm = (struct mg_mqtt_message *) evd;
sprintf(buf + 1, "%.*s/%.*s", (int) mm->topic.len, mm->topic.ptr, sprintf(buf + 1, "%.*s/%.*s", (int) mm->topic.len, mm->topic.ptr,
@ -347,10 +362,12 @@ static void mqtt_cb(struct mg_connection *c, int ev, void *evd, void *fnd) {
static void test_mqtt(void) { static void test_mqtt(void) {
char buf[50] = {0}; char buf[50] = {0};
struct mqtt_data test_data = {buf, 0, 0};
struct mg_mgr mgr; struct mg_mgr mgr;
struct mg_str topic = mg_str("x/f12"), data = mg_str("hi"); struct mg_str topic = mg_str("x/f12"), data = mg_str("hi");
struct mg_connection *c; struct mg_connection *c;
struct mg_mqtt_opts opts; struct mg_mqtt_opts opts;
char rnd[9], client_id[16];
// const char *url = "mqtt://mqtt.eclipse.org:1883"; // const char *url = "mqtt://mqtt.eclipse.org:1883";
const char *url = "mqtt://broker.hivemq.com:1883"; const char *url = "mqtt://broker.hivemq.com:1883";
int i; int i;
@ -363,32 +380,49 @@ static void test_mqtt(void) {
} }
// Connect with empty client ID // Connect with empty client ID
c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, buf); c = mg_mqtt_connect(&mgr, url, NULL, mqtt_cb, &test_data);
for (i = 0; i < 200 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10); for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf)); if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X'); ASSERT(buf[0] == 'X');
ASSERT(test_data.subscribed == 0);
mg_mqtt_sub(c, topic, 1); mg_mqtt_sub(c, topic, 1);
for (i = 0; i < 500 && test_data.subscribed == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.subscribed == 1);
ASSERT(test_data.published == 0);
mg_mqtt_pub(c, topic, data, 1, false); mg_mqtt_pub(c, topic, data, 1, false);
for (i = 0; i < 300 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10); for (i = 0; i < 500 && test_data.published == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.published == 1);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf)); if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0); ASSERT(strcmp(buf, "Xx/f12/hi") == 0);
// Set params // Set params
test_data.subscribed = 0;
test_data.published = 0;
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
memset(&opts, 0, sizeof(opts)); memset(&opts, 0, sizeof(opts));
opts.clean = true; opts.clean = true;
opts.will_qos = 1; opts.will_qos = 1;
opts.will_retain = true; opts.will_retain = true;
opts.keepalive = 20; opts.keepalive = 20;
opts.client_id = mg_str("mg_client"); mg_random(rnd, sizeof(rnd));
mg_base64_encode((unsigned char *) rnd, sizeof(rnd), client_id);
client_id[sizeof(client_id) - 1] = '\0';
opts.client_id = mg_str(client_id);
opts.will_topic = mg_str("mg_will_topic"); opts.will_topic = mg_str("mg_will_topic");
opts.will_message = mg_str("mg_will_messsage"); opts.will_message = mg_str("mg_will_messsage");
c = mg_mqtt_connect(&mgr, url, &opts, mqtt_cb, buf); c = mg_mqtt_connect(&mgr, url, &opts, mqtt_cb, &test_data);
for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10); for (i = 0; i < 300 && buf[0] == 0; i++) mg_mgr_poll(&mgr, 10);
if (buf[0] != 'X') MG_INFO(("[%s]", buf)); if (buf[0] != 'X') MG_INFO(("[%s]", buf));
ASSERT(buf[0] == 'X'); ASSERT(buf[0] == 'X');
ASSERT(test_data.subscribed == 0);
mg_mqtt_sub(c, topic, 1); mg_mqtt_sub(c, topic, 1);
for (i = 0; i < 500 && test_data.subscribed == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.subscribed == 1);
ASSERT(test_data.published == 0);
mg_mqtt_pub(c, topic, data, 1, false); mg_mqtt_pub(c, topic, data, 1, false);
for (i = 0; i < 500 && test_data.published == 0; i++) mg_mgr_poll(&mgr, 10);
ASSERT(test_data.published == 1);
for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10); for (i = 0; i < 500 && buf[1] == 0; i++) mg_mgr_poll(&mgr, 10);
if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf)); if (strcmp(buf, "Xx/f12/hi") != 0) MG_INFO(("[%s]", buf));
ASSERT(strcmp(buf, "Xx/f12/hi") == 0); ASSERT(strcmp(buf, "Xx/f12/hi") == 0);