62#if LWIP_TCP && LWIP_CALLBACK_API
67#if !defined MQTT_DEBUG || defined __DOXYGEN__
68#define MQTT_DEBUG LWIP_DBG_OFF
71#define MQTT_DEBUG_TRACE (MQTT_DEBUG | LWIP_DBG_TRACE)
72#define MQTT_DEBUG_STATE (MQTT_DEBUG | LWIP_DBG_STATE)
73#define MQTT_DEBUG_WARN (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING)
74#define MQTT_DEBUG_WARN_STATE (MQTT_DEBUG | LWIP_DBG_LEVEL_WARNING | LWIP_DBG_STATE)
75#define MQTT_DEBUG_SERIOUS (MQTT_DEBUG | LWIP_DBG_LEVEL_SERIOUS)
92enum mqtt_message_type {
93 MQTT_MSG_TYPE_CONNECT = 1,
94 MQTT_MSG_TYPE_CONNACK = 2,
95 MQTT_MSG_TYPE_PUBLISH = 3,
96 MQTT_MSG_TYPE_PUBACK = 4,
97 MQTT_MSG_TYPE_PUBREC = 5,
98 MQTT_MSG_TYPE_PUBREL = 6,
99 MQTT_MSG_TYPE_PUBCOMP = 7,
100 MQTT_MSG_TYPE_SUBSCRIBE = 8,
101 MQTT_MSG_TYPE_SUBACK = 9,
102 MQTT_MSG_TYPE_UNSUBSCRIBE = 10,
103 MQTT_MSG_TYPE_UNSUBACK = 11,
104 MQTT_MSG_TYPE_PINGREQ = 12,
105 MQTT_MSG_TYPE_PINGRESP = 13,
106 MQTT_MSG_TYPE_DISCONNECT = 14
110#define MQTT_CTL_PACKET_TYPE(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0xf0) >> 4)
111#define MQTT_CTL_PACKET_QOS(fixed_hdr_byte0) ((fixed_hdr_byte0 & 0x6) >> 1)
116enum mqtt_connect_flag {
117 MQTT_CONNECT_FLAG_USERNAME = 1 << 7,
118 MQTT_CONNECT_FLAG_PASSWORD = 1 << 6,
119 MQTT_CONNECT_FLAG_WILL_RETAIN = 1 << 5,
120 MQTT_CONNECT_FLAG_WILL = 1 << 2,
121 MQTT_CONNECT_FLAG_CLEAN_SESSION = 1 << 1
125static void mqtt_cyclic_timer(
void *
arg);
127#if defined(LWIP_DEBUG)
128static const char *
const mqtt_message_type_str[15] = {
158 return mqtt_message_type_str[
msg_type];
173 if (
client->pkt_id_seq == 0) {
176 return client->pkt_id_seq;
223#define mqtt_ringbuf_free(rb) (MQTT_OUTPUT_RINGBUF_SIZE - mqtt_ringbuf_len(rb))
226#define mqtt_ringbuf_linear_read_length(rb) LWIP_MIN(mqtt_ringbuf_len(rb), (MQTT_OUTPUT_RINGBUF_SIZE - (rb)->get))
238 u16_t ringbuf_lin_len = mqtt_ringbuf_linear_read_length(rb);
242 if (send_len == 0 || ringbuf_lin_len == 0) {
246 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_output_send: tcp_sndbuf: %d bytes, ringbuf_linear_available: %d, get %d, put %d\n",
247 send_len, ringbuf_lin_len, rb->
get, rb->
put));
249 if (send_len > ringbuf_lin_len) {
251 send_len = ringbuf_lin_len;
253 wrap = (mqtt_ringbuf_len(rb) > ringbuf_lin_len);
255 err =
altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY | (
wrap ? TCP_WRITE_FLAG_MORE : 0));
257 mqtt_ringbuf_advance_get_idx(rb, send_len);
260 err =
altcp_write(tpcb, mqtt_ringbuf_get_ptr(rb), send_len, TCP_WRITE_FLAG_COPY);
264 mqtt_ringbuf_advance_get_idx(rb, send_len);
292 for (
n = 0;
n < r_objs_len;
n++) {
294 if (r_objs[
n].
next == &r_objs[
n]) {
374 prev->next = iter->
next;
396 while (
t > 0 &&
r !=
NULL) {
397 if (
t >=
r->timeout_diff) {
398 t -= (
u8_t)
r->timeout_diff;
405 mqtt_delete_request(
r);
409 r->timeout_diff -=
t;
426 mqtt_delete_request(iter);
436mqtt_init_requests(
struct mqtt_request_t *r_objs,
size_t r_objs_len)
440 for (
n = 0;
n < r_objs_len;
n++) {
442 r_objs[
n].
next = &r_objs[
n];
453 mqtt_ringbuf_put(rb,
value);
459 mqtt_ringbuf_put(rb,
value >> 8);
460 mqtt_ringbuf_put(rb,
value & 0xff);
468 mqtt_ringbuf_put(rb, ((
const u8_t *)
data)[
n]);
476 mqtt_ringbuf_put(rb,
length >> 8);
477 mqtt_ringbuf_put(rb,
length & 0xff);
479 mqtt_ringbuf_put(rb,
str[
n]);
498 mqtt_output_append_u8(rb, (((
msg_type & 0x0f) << 4) | ((fdup & 1) << 3) | ((fqos & 3) << 1) | (fretain & 1)));
501 mqtt_output_append_u8(rb, (r_length & 0x7f) | (r_length >= 128 ? 0x80 : 0));
503 }
while (r_length > 0);
517 u16_t total_len = 1 + r_length;
525 }
while (r_length > 0);
527 return (total_len <= mqtt_ringbuf_free(rb));
556 mqtt_clear_requests(&
client->pend_req_queue);
558 sys_untimeout(mqtt_cyclic_timer,
client);
561 if (
client->conn_state != TCP_DISCONNECTED) {
563 client->conn_state = TCP_DISCONNECTED;
576mqtt_cyclic_timer(
void *
arg)
578 u8_t restart_timer = 1;
582 if (
client->conn_state == MQTT_CONNECTING) {
585 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_cyclic_timer: CONNECT attempt to server timed out\n"));
590 }
else if (
client->conn_state == MQTT_CONNECTED) {
595 if (
client->keep_alive > 0) {
597 client->server_watchdog++;
600 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_cyclic_timer: Server incoming keep-alive timeout\n"));
608 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_cyclic_timer: Sending keep-alive message to server\n"));
609 if (mqtt_output_check_space(&
client->output, 0) != 0) {
610 mqtt_output_append_fixed_header(&
client->output, MQTT_MSG_TYPE_PINGREQ, 0, 0, 0, 0);
616 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_cyclic_timer: Timer should not be running in state %d\n",
client->conn_state));
637 if (mqtt_output_check_space(&
client->output, 2)) {
638 mqtt_output_append_fixed_header(&
client->output,
msg, 0, qos, 0, 2);
642 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"pub_ack_rec_rel_response: OOM creating response: %s with pkt_id: %d\n",
672 u8_t *var_hdr_payload)
677 u8_t pkt_type = MQTT_CTL_PACKET_TYPE(
client->rx_buffer[0]);
680 LWIP_ASSERT(
"fixed_hdr_len <= client->msg_idx", fixed_hdr_len <= client->msg_idx);
684 if (pkt_type == MQTT_MSG_TYPE_CONNACK) {
685 if (
client->conn_state == MQTT_CONNECTING) {
687 LWIP_DEBUGF(MQTT_DEBUG_WARN,(
"mqtt_message_received: Received short CONNACK message\n"));
692 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_message_received: Connect response code %d\n",
res));
696 client->conn_state = MQTT_CONNECTED;
703 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_message_received: Received CONNACK in connected state\n"));
705 }
else if (pkt_type == MQTT_MSG_TYPE_PINGRESP) {
706 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_message_received: Received PINGRESP from server\n"));
708 }
else if (pkt_type == MQTT_MSG_TYPE_PUBLISH) {
709 u16_t payload_offset = 0;
711 u8_t qos = MQTT_CTL_PACKET_QOS(
client->rx_buffer[0]);
715 size_t var_hdr_payload_bufsize =
sizeof(
client->rx_buffer) - fixed_hdr_len;
720 u16_t qos_len = (qos ? 2U : 0
U);
721 if (
length < 2 + qos_len) {
722 LWIP_DEBUGF(MQTT_DEBUG_WARN,(
"mqtt_message_received: Received short PUBLISH packet\n"));
725 topic_len = var_hdr_payload[0];
726 topic_len = (topic_len << 8) + (
u16_t)(var_hdr_payload[1]);
727 if ((topic_len >
length - (2 + qos_len)) ||
728 (topic_len > var_hdr_payload_bufsize - (2 + qos_len))) {
729 LWIP_DEBUGF(MQTT_DEBUG_WARN,(
"mqtt_message_received: Received short PUBLISH packet (topic)\n"));
733 topic = var_hdr_payload + 2;
734 after_topic = 2 + topic_len;
736 if ((after_topic + (qos ? 2U : 1U)) > var_hdr_payload_bufsize) {
737 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_message_received: Receive buffer can not fit topic + pkt_id\n"));
743 if (
length < after_topic + 2U) {
744 LWIP_DEBUGF(MQTT_DEBUG_WARN,(
"mqtt_message_received: Received short PUBLISH packet (after_topic)\n"));
747 client->inpub_pkt_id = ((
u16_t)var_hdr_payload[after_topic] << 8) + (
u16_t)var_hdr_payload[after_topic + 1];
753 bkp =
topic[topic_len];
755 topic[topic_len] = 0;
757 payload_length =
length - after_topic;
758 payload_offset = after_topic;
760 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_incoming_publish: Received message with QoS %d at topic: %s, payload length %"U32_F"\n",
761 qos,
topic, remaining_length + payload_length));
763 client->pub_cb(
client->inpub_arg, (
const char *)
topic, remaining_length + payload_length);
766 topic[topic_len] = bkp;
768 if (payload_length > 0 || remaining_length == 0) {
769 if (
length < (
size_t)(payload_offset + payload_length)) {
770 LWIP_DEBUGF(MQTT_DEBUG_WARN,(
"mqtt_message_received: Received short packet (payload)\n"));
777 if (remaining_length == 0 && qos > 0) {
779 u8_t resp_msg = (qos == 1) ? MQTT_MSG_TYPE_PUBACK : MQTT_MSG_TYPE_PUBREC;
780 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_incoming_publish: Sending publish response: %s with pkt_id: %d\n",
781 mqtt_msg_type_to_str(resp_msg),
client->inpub_pkt_id));
782 pub_ack_rec_rel_response(
client, resp_msg,
client->inpub_pkt_id, 0);
787 LWIP_DEBUGF(MQTT_DEBUG_WARN,(
"mqtt_message_received: Received short message\n"));
794 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_message_received: Got message with illegal packet identifier: 0\n"));
797 if (pkt_type == MQTT_MSG_TYPE_PUBREC) {
798 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_message_received: PUBREC, sending PUBREL with pkt_id: %d\n",
pkt_id));
799 pub_ack_rec_rel_response(
client, MQTT_MSG_TYPE_PUBREL,
pkt_id, 1);
801 }
else if (pkt_type == MQTT_MSG_TYPE_PUBREL) {
802 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_message_received: PUBREL, sending PUBCOMP response with pkt_id: %d\n",
pkt_id));
803 pub_ack_rec_rel_response(
client, MQTT_MSG_TYPE_PUBCOMP,
pkt_id, 0);
805 }
else if (pkt_type == MQTT_MSG_TYPE_SUBACK || pkt_type == MQTT_MSG_TYPE_UNSUBACK ||
806 pkt_type == MQTT_MSG_TYPE_PUBCOMP || pkt_type == MQTT_MSG_TYPE_PUBACK) {
809 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_message_received: %s response with id %d\n", mqtt_msg_type_to_str(pkt_type),
pkt_id));
810 if (pkt_type == MQTT_MSG_TYPE_SUBACK) {
812 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_message_received: To small SUBACK packet\n"));
815 mqtt_incoming_suback(
r, var_hdr_payload[2]);
817 }
else if (
r->cb !=
NULL) {
820 mqtt_delete_request(
r);
822 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_message_received: Received %s reply, with wrong pkt_id: %d\n", mqtt_msg_type_to_str(pkt_type),
pkt_id));
825 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_message_received: Received unknown message type: %d\n", pkt_type));
845 u32_t msg_rem_len = 0;
846 u8_t fixed_hdr_len = 0;
849 while (
p->tot_len > in_offset) {
853 if ((fixed_hdr_len < 2) || ((
b & 0x80) != 0)) {
855 if (fixed_hdr_len < client->msg_idx) {
857 b =
client->rx_buffer[fixed_hdr_len];
866 if (fixed_hdr_len >= 2) {
870 msg_rem_len |= (
u32_t)(
b & 0x7f) << ((fixed_hdr_len - 2) * 7);
871 if ((
b & 0x80) == 0) {
873 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_parse_incoming: Remaining length after fixed header: %"U32_F"\n", msg_rem_len));
874 if (msg_rem_len == 0) {
876 mqtt_message_received(
client, fixed_hdr_len, 0, 0,
NULL);
882 msg_rem_len = (msg_rem_len + fixed_hdr_len) -
client->msg_idx;
888 u16_t cpy_len, buffer_space;
889 u8_t *var_hdr_payload;
897 if (cpy_len > buffer_space) {
898 cpy_len = buffer_space;
902 if (cpy_len > (
p->len - in_offset))
903 cpy_len =
p->len - in_offset;
906 buffer_space, cpy_len, in_offset);
909 client->msg_idx += cpy_len;
910 in_offset += cpy_len;
911 msg_rem_len -= cpy_len;
915 res = mqtt_message_received(
client, fixed_hdr_len, cpy_len, msg_rem_len, var_hdr_payload);
919 if (msg_rem_len == 0) {
946 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_tcp_recv_cb: Recv pbuf=NULL, remote has closed connection\n"));
951 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_tcp_recv_cb: Recv err=%d\n",
err));
965 if (
client->keep_alive != 0) {
967 client->server_watchdog = 0;
990 if (
client->conn_state == MQTT_CONNECTED) {
995 client->server_watchdog = 0;
997 while ((
r = mqtt_take_request(&
client->pend_req_queue, 0)) !=
NULL) {
998 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_tcp_sent_cb: Calling QoS 0 publish complete callback\n"));
1002 mqtt_delete_request(
r);
1020 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_tcp_err_cb: TCP error callback: error %d, arg: %p\n",
err,
arg));
1037 if (
client->conn_state == MQTT_CONNECTED) {
1039 mqtt_output_send(&
client->output, tpcb);
1056 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_tcp_connect_cb: TCP connect error %d\n",
err));
1068 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_tcp_connect_cb: TCP connection established to server\n"));
1070 client->conn_state = MQTT_CONNECTING;
1109 size_t topic_strlen;
1112 u16_t remaining_length;
1120 LWIP_ERROR(
"mqtt_publish: topic length overflow", (topic_strlen <= (0xFFFF - 2)),
return ERR_ARG);
1121 topic_len = (
u16_t)topic_strlen;
1122 total_len = 2 + topic_len + payload_length;
1132 LWIP_ERROR(
"mqtt_publish: total length overflow", (total_len <= 0xFFFF),
return ERR_ARG);
1133 remaining_length = (
u16_t)total_len;
1135 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_publish: Publish with payload length %d to topic \"%s\"\n", payload_length,
topic));
1142 if (mqtt_output_check_space(&
client->output, remaining_length) == 0) {
1143 mqtt_delete_request(
r);
1147 mqtt_output_append_fixed_header(&
client->output, MQTT_MSG_TYPE_PUBLISH, 0, qos, retain, remaining_length);
1150 mqtt_output_append_string(&
client->output,
topic, topic_len);
1158 if ((payload !=
NULL) && (payload_length > 0)) {
1159 mqtt_output_append_buf(&
client->output, payload, payload_length);
1162 mqtt_append_request(&
client->pend_req_queue,
r);
1182 size_t topic_strlen;
1185 u16_t remaining_length;
1194 LWIP_ERROR(
"mqtt_sub_unsub: topic length overflow", (topic_strlen <= (0xFFFF - 2)),
return ERR_ARG);
1195 topic_len = (
u16_t)topic_strlen;
1197 total_len = topic_len + 2 + 2 + (sub != 0);
1198 LWIP_ERROR(
"mqtt_sub_unsub: total length overflow", (total_len <= 0xFFFF),
return ERR_ARG);
1199 remaining_length = (
u16_t)total_len;
1202 if (
client->conn_state == TCP_DISCONNECTED) {
1203 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_sub_unsub: Can not (un)subscribe in disconnected state\n"));
1213 if (mqtt_output_check_space(&
client->output, remaining_length) == 0) {
1214 mqtt_delete_request(
r);
1218 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_sub_unsub: Client (un)subscribe to topic \"%s\", id: %d\n",
topic,
pkt_id));
1220 mqtt_output_append_fixed_header(&
client->output, sub ? MQTT_MSG_TYPE_SUBSCRIBE : MQTT_MSG_TYPE_UNSUBSCRIBE, 0, 1, 0, remaining_length);
1224 mqtt_output_append_string(&
client->output,
topic, topic_len);
1230 mqtt_append_request(&
client->pend_req_queue,
r);
1250 client->data_cb = data_cb;
1295 u16_t client_id_length;
1297 u16_t remaining_length = 2 + 4 + 1 + 1 + 2;
1298 u8_t flags = 0, will_topic_len = 0, will_msg_len = 0;
1299 u16_t client_user_len = 0, client_pass_len = 0;
1310 if (
client->conn_state != TCP_DISCONNECTED) {
1311 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_client_connect: Already connected\n"));
1316 data_cb =
client->data_cb;
1318 inpub_arg =
client->inpub_arg;
1320 client->data_cb = data_cb;
1322 client->inpub_arg = inpub_arg;
1331 flags |= MQTT_CONNECT_FLAG_WILL;
1334 flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;
1337 LWIP_ERROR(
"mqtt_client_connect: client_info->will_topic length overflow",
len <= 0xFF,
return ERR_VAL);
1338 LWIP_ERROR(
"mqtt_client_connect: client_info->will_topic length must be > 0",
len > 0,
return ERR_VAL);
1341 LWIP_ERROR(
"mqtt_client_connect: client_info->will_msg length overflow",
len <= 0xFF,
return ERR_VAL);
1343 len = remaining_length + 2 + will_topic_len + 2 + will_msg_len;
1348 flags |= MQTT_CONNECT_FLAG_USERNAME;
1350 LWIP_ERROR(
"mqtt_client_connect: client_info->client_user length overflow",
len <= 0xFFFF,
return ERR_VAL);
1351 LWIP_ERROR(
"mqtt_client_connect: client_info->client_user length must be > 0",
len > 0,
return ERR_VAL);
1353 len = remaining_length + 2 + client_user_len;
1358 flags |= MQTT_CONNECT_FLAG_PASSWORD;
1360 LWIP_ERROR(
"mqtt_client_connect: client_info->client_pass length overflow",
len <= 0xFFFF,
return ERR_VAL);
1361 LWIP_ERROR(
"mqtt_client_connect: client_info->client_pass length must be > 0",
len > 0,
return ERR_VAL);
1363 len = remaining_length + 2 + client_pass_len;
1369 flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION;
1372 LWIP_ERROR(
"mqtt_client_connect: client_info->client_id length overflow",
len <= 0xFFFF,
return ERR_VAL);
1374 len = remaining_length + 2 + client_id_length;
1378 if (mqtt_output_check_space(&
client->output, remaining_length) == 0) {
1382#if LWIP_ALTCP && LWIP_ALTCP_TLS
1399 LWIP_DEBUGF(MQTT_DEBUG_WARN, (
"mqtt_client_connect: Error binding to local ip/port, %d\n",
err));
1407 LWIP_DEBUGF(MQTT_DEBUG_TRACE, (
"mqtt_client_connect: Error connecting to remote ip/port, %d\n",
err));
1412 client->conn_state = TCP_CONNECTING;
1415 mqtt_output_append_fixed_header(&
client->output, MQTT_MSG_TYPE_CONNECT, 0, 0, 0, remaining_length);
1417 mqtt_output_append_string(&
client->output,
"MQTT", 4);
1419 mqtt_output_append_u8(&
client->output, 4);
1425 mqtt_output_append_string(&
client->output,
client_info->client_id, client_id_length);
1427 if ((
flags & MQTT_CONNECT_FLAG_WILL) != 0) {
1428 mqtt_output_append_string(&
client->output,
client_info->will_topic, will_topic_len);
1429 mqtt_output_append_string(&
client->output,
client_info->will_msg, will_msg_len);
1432 if ((
flags & MQTT_CONNECT_FLAG_USERNAME) != 0) {
1433 mqtt_output_append_string(&
client->output,
client_info->client_user, client_user_len);
1436 if ((
flags & MQTT_CONNECT_FLAG_PASSWORD) != 0) {
1437 mqtt_output_append_string(&
client->output,
client_info->client_pass, client_pass_len);
1459 if (
client->conn_state != TCP_DISCONNECTED) {
1461 client->conn_state = TCP_DISCONNECTED;
1477 return client->conn_state == MQTT_CONNECTED;
ACPI_SIZE strlen(const char *String)
struct outqueuenode * tail
struct outqueuenode * head
#define altcp_tcp_new_ip_type
#define LWIP_ARRAYSIZE(x)
#define mem_free(ptr, bsize)
static WCHAR reason[MAX_STRING_RESOURCE_LEN]
#define wrap(journal, var)
void * mem_calloc(mem_size_t count, mem_size_t size)
#define LWIP_DEBUGF(debug, message)
#define LWIP_ERROR(message, expression, handler)
#define LWIP_ASSERT(message, assertion)
GLint GLenum GLsizei GLsizei GLsizei GLint GLsizei const GLvoid * data
GLdouble GLdouble GLdouble r
GLboolean GLboolean GLboolean b
GLuint GLsizei GLsizei * length
#define time_before(a, b)
#define LWIP_UNUSED_ARG(x)
#define LWIP_ASSERT_CORE_LOCKED()
#define MQTT_CYCLIC_TIMER_INTERVAL
#define MQTT_OUTPUT_RINGBUF_SIZE
#define MQTT_VAR_HEADER_BUFFER_LEN
#define MQTT_CONNECT_TIMOUT
void(* mqtt_incoming_publish_cb_t)(void *arg, const char *topic, u32_t tot_len)
void(* mqtt_connection_cb_t)(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
void(* mqtt_request_cb_t)(void *arg, err_t err)
void(* mqtt_incoming_data_cb_t)(void *arg, const u8_t *data, u16_t len, u8_t flags)
@ MQTT_CONNECT_DISCONNECTED
u8_t pbuf_get_at(const struct pbuf *p, u16_t offset)
void * pbuf_get_contiguous(const struct pbuf *p, void *buffer, size_t bufsize, u16_t len, u16_t offset)
u8_t pbuf_free(struct pbuf *p)
#define IP_GET_TYPE(ipaddr)
#define ipaddr_ntoa(ipaddr)
static HMODULE MODULEINFO DWORD cb
void mqtt_disconnect(mqtt_client_t *client)
u8_t mqtt_client_is_connected(mqtt_client_t *client)
void mqtt_client_free(mqtt_client_t *client)
err_t mqtt_client_connect(mqtt_client_t *client, const ip_addr_t *ipaddr, u16_t port, mqtt_connection_cb_t cb, void *arg, const struct mqtt_connect_client_info_t *client_info)
mqtt_client_t * mqtt_client_new(void)
err_t mqtt_publish(mqtt_client_t *client, const char *topic, const void *payload, u16_t payload_length, u8_t qos, u8_t retain, mqtt_request_cb_t cb, void *arg)
err_t mqtt_sub_unsub(mqtt_client_t *client, const char *topic, u8_t qos, mqtt_request_cb_t cb, void *arg, u8_t sub)
void mqtt_set_inpub_callback(mqtt_client_t *client, mqtt_incoming_publish_cb_t pub_cb, mqtt_incoming_data_cb_t data_cb, void *arg)
static const char topic[]
static unsigned __int64 next
struct mqtt_request_t * next
u8_t buf[MQTT_OUTPUT_RINGBUF_SIZE]