source: azure_iot_hub_f767zi/trunk/azure_iot_sdk/umqtt/src/mqtt_client.c@ 457

Last change on this file since 457 was 457, checked in by coas-nagasima, 4 years ago

ファイルを追加

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 51.6 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <stdbool.h>
6#include "azure_c_shared_utility/agenttime.h"
7#include "azure_c_shared_utility/const_defines.h"
8#include "azure_c_shared_utility/crt_abstractions.h"
9#include "azure_c_shared_utility/gballoc.h"
10#include "azure_c_shared_utility/optimize_size.h"
11#include "azure_macro_utils/macro_utils.h"
12#include "azure_c_shared_utility/platform.h"
13#include "azure_c_shared_utility/strings.h"
14#include "azure_c_shared_utility/threadapi.h"
15#include "azure_c_shared_utility/tickcounter.h"
16#include "azure_c_shared_utility/xlogging.h"
17
18#include "azure_umqtt_c/mqtt_client.h"
19#include "azure_umqtt_c/mqtt_codec.h"
20#include <inttypes.h>
21
22#define VARIABLE_HEADER_OFFSET 2
23#define RETAIN_FLAG_MASK 0x1
24#define QOS_LEAST_ONCE_FLAG_MASK 0x2
25#define QOS_EXACTLY_ONCE_FLAG_MASK 0x4
26#define DUPLICATE_FLAG_MASK 0x8
27#define CONNECT_PACKET_MASK 0xf0
28#define TIME_MAX_BUFFER 16
29#define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings
30#define MAX_CLOSE_RETRIES 20
31
32static const char* const TRUE_CONST = "true";
33static const char* const FALSE_CONST = "false";
34
35MU_DEFINE_ENUM_STRINGS(QOS_VALUE, QOS_VALUE_VALUES);
36
37#define MQTT_STATUS_INITIAL_STATUS 0x0000
38#define MQTT_STATUS_CLIENT_CONNECTED 0x0001
39#define MQTT_STATUS_SOCKET_CONNECTED 0x0002
40#define MQTT_STATUS_PENDING_CLOSE 0x0004
41
42#define MQTT_FLAGS_LOG_TRACE 0x0001
43#define MQTT_FLAGS_RAW_TRACE 0x0002
44
45typedef struct MQTT_CLIENT_TAG
46{
47 XIO_HANDLE xioHandle;
48 MQTTCODEC_HANDLE codec_handle;
49 CONTROL_PACKET_TYPE packetState;
50 TICK_COUNTER_HANDLE packetTickCntr;
51 tickcounter_ms_t packetSendTimeMs;
52 ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
53 ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
54 void* ctx;
55 ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack;
56 void* errorCBCtx;
57 ON_MQTT_DISCONNECTED_CALLBACK disconnect_cb;
58 void* disconnect_ctx;
59 QOS_VALUE qosValue;
60 uint16_t keepAliveInterval;
61 MQTT_CLIENT_OPTIONS mqttOptions;
62
63 uint16_t mqtt_status;
64 uint16_t mqtt_flags;
65
66 tickcounter_ms_t timeSincePing;
67 uint16_t maxPingRespTime;
68} MQTT_CLIENT;
69
70static bool is_trace_enabled(MQTT_CLIENT* mqtt_client)
71{
72 return (mqtt_client->mqtt_flags & MQTT_FLAGS_LOG_TRACE);
73}
74
75#ifdef ENABLE_RAW_TRACE
76static bool is_raw_trace_enabled(MQTT_CLIENT* mqtt_client)
77{
78 return (mqtt_client->mqtt_flags & MQTT_FLAGS_RAW_TRACE);
79}
80#endif // ENABLE_RAW_TRACE
81
82static void on_connection_closed(void* context)
83{
84 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
85 if (mqtt_client != NULL)
86 {
87 mqtt_client->mqtt_status &= ~MQTT_STATUS_SOCKET_CONNECTED;
88 if (mqtt_client->disconnect_cb)
89 {
90 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
91 }
92 }
93}
94
95static void close_connection(MQTT_CLIENT* mqtt_client)
96{
97 if (mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED)
98 {
99 (void)xio_close(mqtt_client->xioHandle, on_connection_closed, mqtt_client);
100 if (mqtt_client->disconnect_cb == NULL)
101 {
102 size_t counter = 0;
103 do
104 {
105 xio_dowork(mqtt_client->xioHandle);
106 counter++;
107 ThreadAPI_Sleep(2);
108 } while (mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED && counter < MAX_CLOSE_RETRIES);
109 }
110 // Clear the handle because we don't use it anymore
111 mqtt_client->xioHandle = NULL;
112 }
113 else
114 {
115 mqtt_client->mqtt_status &= ~MQTT_STATUS_SOCKET_CONNECTED;
116 if (mqtt_client->disconnect_cb)
117 {
118 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
119 }
120 }
121}
122
123static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type)
124{
125 if (mqtt_client->fnOnErrorCallBack)
126 {
127 mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx);
128 }
129 close_connection(mqtt_client);
130}
131
132static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client)
133{
134 STRING_HANDLE trace_log;
135 if (is_trace_enabled(mqtt_client) )
136 {
137 trace_log = STRING_new();
138 }
139 else
140 {
141 trace_log = NULL;
142 }
143 return trace_log;
144}
145
146static uint16_t byteutil_read_uint16(uint8_t** buffer, size_t byteLen)
147{
148 uint16_t result = 0;
149 if (buffer != NULL && *buffer != NULL && byteLen >= 2)
150 {
151 result = 256 * (**buffer) + (*(*buffer + 1));
152 *buffer += 2; // Move the ptr
153 }
154 else
155 {
156 LogError("byteutil_read_uint16 == NULL or less than 2");
157 }
158 return result;
159}
160
161static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
162{
163 char* result = NULL;
164
165 const uint8_t* bufferInitial = *buffer;
166 // Get the length of the string
167 uint16_t stringLen = byteutil_read_uint16(buffer, *byteLen);
168 // Verify that byteutil_read_uint16 succeeded (by stringLen>0) and that we're
169 // not being asked to read a string longer than buffer passed in.
170 if ((stringLen > 0) && ((size_t)(stringLen + (*buffer - bufferInitial)) <= *byteLen))
171 {
172 result = (char*)malloc(stringLen + 1);
173 if (result != NULL)
174 {
175 (void)memcpy(result, *buffer, stringLen);
176 result[stringLen] = '\0';
177 *buffer += stringLen;
178 *byteLen = stringLen;
179 }
180 }
181 else
182 {
183 LogError("String passed not a valid UTF.");
184 }
185
186 return result;
187}
188
189static uint8_t byteutil_readByte(uint8_t** buffer)
190{
191 uint8_t result = 0;
192 if (buffer != NULL)
193 {
194 result = **buffer;
195 (*buffer)++;
196 }
197 else
198 {
199 LogError("readByte buffer == NULL.");
200 }
201 return result;
202}
203
204static void sendComplete(void* context, IO_SEND_RESULT send_result)
205{
206 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
207 if (mqtt_client != NULL)
208 {
209 if (send_result == IO_SEND_OK)
210 {
211 if (mqtt_client->packetState == DISCONNECT_TYPE)
212 {
213 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
214 if (mqtt_client->fnOperationCallback != NULL)
215 {
216 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
217 }
218
219 // Mark to close
220 mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
221 }
222 }
223 else if (send_result == IO_SEND_ERROR)
224 {
225 LogError("MQTT Send Complete Failure send_result: %d", (int)send_result);
226 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
227 }
228 }
229 else
230 {
231 LogError("MQTT Send Complete Failure with NULL mqtt_client");
232 }
233}
234
235#ifndef NO_LOGGING
236static void getLogTime(char* timeResult, size_t len)
237{
238 if (timeResult != NULL)
239 {
240 time_t agent_time = get_time(NULL);
241 if (agent_time == (time_t)-1)
242 {
243 timeResult[0] = '\0';
244 }
245 else
246 {
247 struct tm* tmInfo = localtime(&agent_time);
248 if (tmInfo == NULL)
249 {
250 timeResult[0] = '\0';
251 }
252 else
253 {
254 if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0)
255 {
256 timeResult[0] = '\0';
257 }
258 }
259 }
260 }
261}
262
263#ifdef ENABLE_RAW_TRACE
264static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
265{
266 switch (packet&CONNECT_PACKET_MASK)
267 {
268 case CONNECT_TYPE: return "CONNECT";
269 case CONNACK_TYPE: return "CONNACK";
270 case PUBLISH_TYPE: return "PUBLISH";
271 case PUBACK_TYPE: return "PUBACK";
272 case PUBREC_TYPE: return "PUBREC";
273 case PUBREL_TYPE: return "PUBREL";
274 case SUBSCRIBE_TYPE: return "SUBSCRIBE";
275 case SUBACK_TYPE: return "SUBACK";
276 case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE";
277 case UNSUBACK_TYPE: return "UNSUBACK";
278 case PINGREQ_TYPE: return "PINGREQ";
279 case PINGRESP_TYPE: return "PINGRESP";
280 case DISCONNECT_TYPE: return "DISCONNECT";
281 default:
282 case PACKET_TYPE_ERROR:
283 case UNKNOWN_TYPE:
284 return "UNKNOWN";
285 }
286}
287
288static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
289{
290 if (mqtt_client != NULL && data != NULL && length > 0 && is_raw_trace_enabled(mqtt_client))
291 {
292 char tmBuffer[TIME_MAX_BUFFER];
293 getLogTime(tmBuffer, TIME_MAX_BUFFER);
294
295 LOG(AZ_LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0]));
296 size_t index = 0;
297 for (index = 0; index < length; index++)
298 {
299 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
300 }
301 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
302 }
303}
304
305static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
306{
307 if (mqtt_client != NULL && is_raw_trace_enabled(mqtt_client))
308 {
309 if (data != NULL && length > 0)
310 {
311 char tmBuffer[TIME_MAX_BUFFER];
312 getLogTime(tmBuffer, TIME_MAX_BUFFER);
313
314 LOG(AZ_LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
315 size_t index = 0;
316 for (index = 0; index < length; index++)
317 {
318 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
319 }
320 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
321 }
322 else if (packet == PINGRESP_TYPE)
323 {
324 char tmBuffer[TIME_MAX_BUFFER];
325 getLogTime(tmBuffer, TIME_MAX_BUFFER);
326 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
327 }
328 }
329}
330#endif // ENABLE_RAW_TRACE
331
332static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
333{
334 if (mqtt_client != NULL && is_trace_enabled(mqtt_client) && trace_log != NULL)
335 {
336 char tmBuffer[TIME_MAX_BUFFER];
337 getLogTime(tmBuffer, TIME_MAX_BUFFER);
338 LOG(AZ_LOG_TRACE, LOG_LINE, "-> %s %s", tmBuffer, STRING_c_str(trace_log));
339 }
340}
341
342static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
343{
344 if (mqtt_client != NULL && is_trace_enabled(mqtt_client) && trace_log != NULL)
345 {
346 char tmBuffer[TIME_MAX_BUFFER];
347 getLogTime(tmBuffer, TIME_MAX_BUFFER);
348 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s", tmBuffer, STRING_c_str(trace_log) );
349 }
350}
351#else // NO_LOGGING
352static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
353{
354 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
355 AZURE_UNREFERENCED_PARAMETER(data);
356 AZURE_UNREFERENCED_PARAMETER(length);
357}
358
359static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
360{
361 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
362 AZURE_UNREFERENCED_PARAMETER(trace_log);
363}
364
365static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
366{
367 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
368 AZURE_UNREFERENCED_PARAMETER(trace_log);
369}
370
371static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
372{
373 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
374 AZURE_UNREFERENCED_PARAMETER(packet);
375 AZURE_UNREFERENCED_PARAMETER(flags);
376 AZURE_UNREFERENCED_PARAMETER(data);
377 AZURE_UNREFERENCED_PARAMETER(length);
378}
379#endif // NO_LOGGING
380
381static int sendPacketItem(MQTT_CLIENT* mqtt_client, const unsigned char* data, size_t length)
382{
383 int result;
384
385 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->packetSendTimeMs) != 0)
386 {
387 LogError("Failure getting current ms tickcounter");
388 result = MU_FAILURE;
389 }
390 else
391 {
392 result = xio_send(mqtt_client->xioHandle, (const void*)data, length, sendComplete, mqtt_client);
393 if (result != 0)
394 {
395 LogError("Failure sending control packet data");
396 result = MU_FAILURE;
397 }
398 else
399 {
400#ifdef ENABLE_RAW_TRACE
401 logOutgoingRawTrace(mqtt_client, (const uint8_t*)data, length);
402#endif
403 }
404 }
405 return result;
406}
407
408static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
409{
410 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
411 if (mqtt_client != NULL)
412 {
413 if (open_result == IO_OPEN_OK && !(mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED))
414 {
415 mqtt_client->packetState = CONNECT_TYPE;
416 mqtt_client->mqtt_status |= MQTT_STATUS_SOCKET_CONNECTED;
417
418 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
419
420 // Send the Connect packet
421 BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log);
422 if (connPacket == NULL)
423 {
424 LogError("Error: mqtt_codec_connect failed");
425 }
426 else
427 {
428 size_t size = BUFFER_length(connPacket);
429 /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
430 if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0)
431 {
432 LogError("Error: failure sending connect packet");
433 // Set the status to pending close because we connot continue
434 // with CONN failing to send
435 if (mqtt_client->fnOnErrorCallBack)
436 {
437 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
438 }
439 mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
440 }
441 else
442 {
443 log_outgoing_trace(mqtt_client, trace_log);
444 }
445 BUFFER_delete(connPacket);
446 }
447 if (trace_log != NULL)
448 {
449 STRING_delete(trace_log);
450 }
451 }
452 else
453 {
454 LogError("Error: failure opening connection to endpoint");
455 if (!(mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED) && mqtt_client->fnOnErrorCallBack)
456 {
457 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
458 }
459 mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
460 }
461 }
462 else
463 {
464 LogError("Error: mqtt_client is NULL");
465 }
466}
467
468static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
469{
470 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
471 if (mqtt_client != NULL)
472 {
473 if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0)
474 {
475 mqtt_codec_reset(mqtt_client->codec_handle);
476 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
477 }
478 }
479 else
480 {
481 LogError("Error: mqtt_client is NULL");
482 }
483}
484
485static void onIoError(void* context)
486{
487 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
488 if (mqtt_client != NULL && mqtt_client->fnOperationCallback)
489 {
490 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
491 /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */
492 set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR);
493 }
494 else
495 {
496 LogError("Error invalid parameter: mqtt_client: %p", mqtt_client);
497 }
498}
499
500static void clear_mqtt_options(MQTT_CLIENT* mqtt_client)
501{
502 if (mqtt_client->mqttOptions.clientId != NULL)
503 {
504 free(mqtt_client->mqttOptions.clientId);
505 mqtt_client->mqttOptions.clientId = NULL;
506 }
507
508 if (mqtt_client->mqttOptions.willTopic != NULL)
509 {
510 free(mqtt_client->mqttOptions.willTopic);
511 mqtt_client->mqttOptions.willTopic = NULL;
512 }
513
514 if (mqtt_client->mqttOptions.willMessage != NULL)
515 {
516 free(mqtt_client->mqttOptions.willMessage);
517 mqtt_client->mqttOptions.willMessage = NULL;
518 }
519
520 if (mqtt_client->mqttOptions.username != NULL)
521 {
522 free(mqtt_client->mqttOptions.username);
523 mqtt_client->mqttOptions.username = NULL;
524 }
525
526 if (mqtt_client->mqttOptions.password != NULL)
527 {
528 free(mqtt_client->mqttOptions.password);
529 mqtt_client->mqttOptions.password = NULL;
530 }
531}
532
533static int cloneMqttOptions(MQTT_CLIENT* mqtt_client, const MQTT_CLIENT_OPTIONS* mqttOptions)
534{
535 int result = 0;
536 char* temp_option;
537
538 if (mqttOptions->clientId != NULL)
539 {
540 if (mallocAndStrcpy_s(&temp_option, mqttOptions->clientId) != 0)
541 {
542 result = MU_FAILURE;
543 LogError("mallocAndStrcpy_s clientId");
544 }
545 else
546 {
547 if (mqtt_client->mqttOptions.clientId != NULL)
548 {
549 free(mqtt_client->mqttOptions.clientId);
550 }
551 mqtt_client->mqttOptions.clientId = temp_option;
552 }
553 }
554 if (result == 0 && mqttOptions->willTopic != NULL)
555 {
556 temp_option = NULL;
557 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willTopic) != 0)
558 {
559 result = MU_FAILURE;
560 LogError("mallocAndStrcpy_s willTopic");
561 }
562 else
563 {
564 if (mqtt_client->mqttOptions.willTopic != NULL)
565 {
566 free(mqtt_client->mqttOptions.willTopic);
567 }
568 mqtt_client->mqttOptions.willTopic = temp_option;
569 }
570 }
571 if (result == 0 && mqttOptions->willMessage != NULL)
572 {
573 temp_option = NULL;
574 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willMessage) != 0)
575 {
576 LogError("mallocAndStrcpy_s willMessage");
577 result = MU_FAILURE;
578 }
579 else
580 {
581 if (mqtt_client->mqttOptions.willMessage != NULL)
582 {
583 free(mqtt_client->mqttOptions.willMessage);
584 }
585 mqtt_client->mqttOptions.willMessage = temp_option;
586 }
587 }
588 if (result == 0 && mqttOptions->username != NULL)
589 {
590 temp_option = NULL;
591 if (mallocAndStrcpy_s(&temp_option, mqttOptions->username) != 0)
592 {
593 LogError("mallocAndStrcpy_s username");
594 result = MU_FAILURE;
595 }
596 else
597 {
598 if (mqtt_client->mqttOptions.username != NULL)
599 {
600 free(mqtt_client->mqttOptions.username);
601 }
602 mqtt_client->mqttOptions.username = temp_option;
603 }
604 }
605 if (result == 0 && mqttOptions->password != NULL)
606 {
607 temp_option = NULL;
608 if (mallocAndStrcpy_s(&temp_option, mqttOptions->password) != 0)
609 {
610 LogError("mallocAndStrcpy_s password");
611 result = MU_FAILURE;
612 }
613 else
614 {
615 if (mqtt_client->mqttOptions.password != NULL)
616 {
617 free(mqtt_client->mqttOptions.password);
618 }
619 mqtt_client->mqttOptions.password = temp_option;
620 }
621 }
622 if (result == 0)
623 {
624 mqtt_client->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
625 mqtt_client->mqttOptions.messageRetain = mqttOptions->messageRetain;
626 mqtt_client->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
627 mqtt_client->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
628 }
629 else
630 {
631 clear_mqtt_options(mqtt_client);
632 }
633 return result;
634}
635
636static void ProcessPublishMessage(MQTT_CLIENT* mqtt_client, uint8_t* initialPos, size_t packetLength, int flags)
637{
638 bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
639 bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
640 QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
641
642 uint8_t* iterator = initialPos;
643 size_t numberOfBytesToBeRead = packetLength;
644 size_t lengthOfTopicName = numberOfBytesToBeRead;
645 char* topicName = byteutil_readUTF(&iterator, &lengthOfTopicName);
646 if (topicName == NULL)
647 {
648 LogError("Publish MSG: failure reading topic name");
649 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
650 }
651 else
652 {
653 STRING_HANDLE trace_log = NULL;
654
655#ifndef NO_LOGGING
656 if (is_trace_enabled(mqtt_client))
657 {
658 trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s | TOPIC_NAME: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
659 isRetainMsg ? 1 : 0, MU_ENUM_TO_STRING(QOS_VALUE, qosValue), topicName);
660 }
661#endif
662 uint16_t packetId = 0;
663 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
664 if (qosValue != DELIVER_AT_MOST_ONCE)
665 {
666 packetId = byteutil_read_uint16(&iterator, numberOfBytesToBeRead);
667#ifndef NO_LOGGING
668 if (is_trace_enabled(mqtt_client))
669 {
670 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
671 }
672#endif
673 }
674 if ((qosValue != DELIVER_AT_MOST_ONCE) && (packetId == 0))
675 {
676 LogError("Publish MSG: packetId=0, invalid");
677 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
678 }
679 else
680 {
681 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
682
683 MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create_in_place(packetId, topicName, qosValue, iterator, numberOfBytesToBeRead);
684 if (msgHandle == NULL)
685 {
686 LogError("failure in mqttmessage_create");
687 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
688 }
689 else if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
690 mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
691 {
692 LogError("failure setting mqtt message property");
693 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
694 }
695 else
696 {
697#ifndef NO_LOGGING
698 if (is_trace_enabled(mqtt_client))
699 {
700 STRING_sprintf(trace_log, " | PAYLOAD_LEN: %lu", (unsigned long)numberOfBytesToBeRead);
701 log_incoming_trace(mqtt_client, trace_log);
702 }
703#endif
704 mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);
705
706 BUFFER_HANDLE pubRel = NULL;
707 if (qosValue == DELIVER_EXACTLY_ONCE)
708 {
709 pubRel = mqtt_codec_publishReceived(packetId);
710 if (pubRel == NULL)
711 {
712 LogError("Failed to allocate publish receive message.");
713 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
714 }
715 }
716 else if (qosValue == DELIVER_AT_LEAST_ONCE)
717 {
718 pubRel = mqtt_codec_publishAck(packetId);
719 if (pubRel == NULL)
720 {
721 LogError("Failed to allocate publish ack message.");
722 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
723 }
724 }
725 if (pubRel != NULL)
726 {
727 size_t size = BUFFER_length(pubRel);
728 if (sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size) != 0)
729 {
730 LogError("Failed sending publish reply.");
731 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
732 }
733 BUFFER_delete(pubRel);
734 }
735 }
736 mqttmessage_destroy(msgHandle);
737 }
738
739 if (trace_log != NULL)
740 {
741 STRING_delete(trace_log);
742 }
743
744 free(topicName);
745 }
746}
747
748static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
749{
750 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
751 if (mqtt_client != NULL)
752 {
753 size_t packetLength = 0;
754 uint8_t* iterator = NULL;
755 if (headerData != NULL)
756 {
757 packetLength = BUFFER_length(headerData);
758 iterator = BUFFER_u_char(headerData);
759 }
760
761#ifdef ENABLE_RAW_TRACE
762 logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, packetLength);
763#endif
764 if ((iterator != NULL && packetLength > 0) || packet == PINGRESP_TYPE)
765 {
766 switch (packet)
767 {
768 case CONNACK_TYPE:
769 {
770 /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
771 CONNECT_ACK connack = { 0 };
772 connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
773 uint8_t rc = byteutil_readByte(&iterator);
774 connack.returnCode =
775 (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
776 (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;
777
778#ifndef NO_LOGGING
779 if (is_trace_enabled(mqtt_client))
780 {
781 STRING_HANDLE trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
782 log_incoming_trace(mqtt_client, trace_log);
783 STRING_delete(trace_log);
784 }
785#endif
786 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);
787
788 if (connack.returnCode == CONNECTION_ACCEPTED)
789 {
790 mqtt_client->mqtt_status |= MQTT_STATUS_CLIENT_CONNECTED;
791 }
792 break;
793 }
794 case PUBLISH_TYPE:
795 {
796 ProcessPublishMessage(mqtt_client, iterator, packetLength, flags);
797 break;
798 }
799 case PUBACK_TYPE:
800 case PUBREC_TYPE:
801 case PUBREL_TYPE:
802 case PUBCOMP_TYPE:
803 {
804 /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
805 MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
806 (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
807 (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
808
809 PUBLISH_ACK publish_ack = { 0 };
810 publish_ack.packetId = byteutil_read_uint16(&iterator, packetLength);
811
812#ifndef NO_LOGGING
813 if (is_trace_enabled(mqtt_client))
814 {
815 STRING_HANDLE trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
816 publish_ack.packetId);
817
818 log_incoming_trace(mqtt_client, trace_log);
819 STRING_delete(trace_log);
820 }
821#endif
822 BUFFER_HANDLE pubRel = NULL;
823 mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
824 if (packet == PUBREC_TYPE)
825 {
826 pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
827 if (pubRel == NULL)
828 {
829 LogError("Failed to allocate publish release message.");
830 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
831 }
832 }
833 else if (packet == PUBREL_TYPE)
834 {
835 pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
836 if (pubRel == NULL)
837 {
838 LogError("Failed to allocate publish complete message.");
839 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
840 }
841 }
842 if (pubRel != NULL)
843 {
844 size_t size = BUFFER_length(pubRel);
845 if (sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size) != 0)
846 {
847 LogError("Failed sending publish reply.");
848 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
849 }
850 BUFFER_delete(pubRel);
851 }
852 break;
853 }
854 case SUBACK_TYPE:
855 {
856
857 /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
858 SUBSCRIBE_ACK suback = { 0 };
859
860 size_t remainLen = packetLength;
861 suback.packetId = byteutil_read_uint16(&iterator, packetLength);
862 remainLen -= 2;
863
864#ifndef NO_LOGGING
865 STRING_HANDLE trace_log = NULL;
866 if (is_trace_enabled(mqtt_client))
867 {
868 trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
869 }
870#endif
871 // Allocate the remaining len
872 suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
873 if (suback.qosReturn != NULL)
874 {
875 while (remainLen > 0)
876 {
877 uint8_t qosRet = byteutil_readByte(&iterator);
878 suback.qosReturn[suback.qosCount++] =
879 (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
880 (QOS_VALUE)qosRet : DELIVER_FAILURE;
881 remainLen--;
882#ifndef NO_LOGGING
883 if (is_trace_enabled(mqtt_client))
884 {
885 STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
886 }
887#endif
888 }
889
890#ifndef NO_LOGGING
891 if (is_trace_enabled(mqtt_client))
892 {
893 log_incoming_trace(mqtt_client, trace_log);
894 STRING_delete(trace_log);
895 }
896#endif
897 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
898 free(suback.qosReturn);
899 }
900 else
901 {
902 LogError("allocation of quality of service value failed.");
903 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
904 }
905 break;
906 }
907 case UNSUBACK_TYPE:
908 {
909 /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
910 UNSUBSCRIBE_ACK unsuback = { 0 };
911 unsuback.packetId = byteutil_read_uint16(&iterator, packetLength);
912
913#ifndef NO_LOGGING
914 if (is_trace_enabled(mqtt_client))
915 {
916 STRING_HANDLE trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
917 log_incoming_trace(mqtt_client, trace_log);
918 STRING_delete(trace_log);
919 }
920#endif
921 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
922 break;
923 }
924 case PINGRESP_TYPE:
925 mqtt_client->timeSincePing = 0;
926#ifndef NO_LOGGING
927 if (is_trace_enabled(mqtt_client))
928 {
929 STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
930 log_incoming_trace(mqtt_client, trace_log);
931 STRING_delete(trace_log);
932 }
933#endif
934 // Forward ping response to operation callback
935 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
936 break;
937 default:
938 break;
939 }
940 }
941 }
942 else
943 {
944 LogError("recvCompleteCallback context failed.");
945 }
946}
947
948MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK operation_cb, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx)
949{
950 MQTT_CLIENT* result;
951 /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
952 if (msgRecv == NULL || operation_cb == NULL)
953 {
954 LogError("Invalid parameter specified msgRecv: %p, operation_cb: %p", msgRecv, operation_cb);
955 result = NULL;
956 }
957 else
958 {
959 result = malloc(sizeof(MQTT_CLIENT));
960 if (result == NULL)
961 {
962 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
963 LogError("mqtt_client_init failure: Allocation Failure");
964 }
965 else
966 {
967 memset(result, 0, sizeof(MQTT_CLIENT));
968 /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
969 result->packetState = UNKNOWN_TYPE;
970 result->fnOperationCallback = operation_cb;
971 result->ctx = opCallbackCtx;
972 result->fnMessageRecv = msgRecv;
973 result->fnOnErrorCallBack = onErrorCallBack;
974 result->errorCBCtx = errorCBCtx;
975 result->qosValue = DELIVER_AT_MOST_ONCE;
976 result->packetTickCntr = tickcounter_create();
977 result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
978 if (result->packetTickCntr == NULL)
979 {
980 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
981 LogError("mqtt_client_init failure: tickcounter_create failure");
982 free(result);
983 result = NULL;
984 }
985 else
986 {
987 result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
988 if (result->codec_handle == NULL)
989 {
990 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
991 LogError("mqtt_client_init failure: mqtt_codec_create failure");
992 tickcounter_destroy(result->packetTickCntr);
993 free(result);
994 result = NULL;
995 }
996 }
997 }
998 }
999 return result;
1000}
1001
1002void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
1003{
1004 /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
1005 if (handle != NULL)
1006 {
1007 /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
1008 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1009 tickcounter_destroy(mqtt_client->packetTickCntr);
1010 mqtt_codec_destroy(mqtt_client->codec_handle);
1011 clear_mqtt_options(mqtt_client);
1012 free(mqtt_client);
1013 }
1014}
1015
1016int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
1017{
1018 int result;
1019 /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
1020 if (handle == NULL || mqttOptions == NULL || xioHandle == NULL)
1021 {
1022 LogError("mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p, xioHandle: %p)", handle, mqttOptions, xioHandle);
1023 result = MU_FAILURE;
1024 }
1025 else
1026 {
1027 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1028 mqtt_client->xioHandle = xioHandle;
1029 mqtt_client->packetState = UNKNOWN_TYPE;
1030 mqtt_client->qosValue = mqttOptions->qualityOfServiceValue;
1031 mqtt_client->keepAliveInterval = mqttOptions->keepAliveInterval;
1032 mqtt_client->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
1033 if (cloneMqttOptions(mqtt_client, mqttOptions) != 0)
1034 {
1035 LogError("Error: Clone Mqtt Options failed");
1036 result = MU_FAILURE;
1037 }
1038 /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
1039 else if (xio_open(xioHandle, onOpenComplete, mqtt_client, onBytesReceived, mqtt_client, onIoError, mqtt_client) != 0)
1040 {
1041 /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
1042 LogError("Error: io_open failed");
1043 result = MU_FAILURE;
1044 mqtt_client->xioHandle = NULL;
1045 // Remove cloned options
1046 clear_mqtt_options(mqtt_client);
1047 }
1048 else
1049 {
1050 result = 0;
1051 }
1052 }
1053 return result;
1054}
1055
1056int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
1057{
1058 int result;
1059 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1060 if (mqtt_client == NULL || msgHandle == NULL)
1061 {
1062 /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
1063 LogError("Invalid parameter specified mqtt_client: %p, msgHandle: %p", mqtt_client, msgHandle);
1064 result = MU_FAILURE;
1065 }
1066 else
1067 {
1068 /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
1069 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1070 if (payload == NULL)
1071 {
1072 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1073 LogError("Error: mqttmessage_getApplicationMsg failed");
1074 result = MU_FAILURE;
1075 }
1076 else
1077 {
1078 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1079
1080 QOS_VALUE qos = mqttmessage_getQosType(msgHandle);
1081 bool isDuplicate = mqttmessage_getIsDuplicateMsg(msgHandle);
1082 bool isRetained = mqttmessage_getIsRetained(msgHandle);
1083 uint16_t packetId = mqttmessage_getPacketId(msgHandle);
1084 const char* topicName = mqttmessage_getTopicName(msgHandle);
1085 BUFFER_HANDLE publishPacket = mqtt_codec_publish(qos, isDuplicate, isRetained, packetId, topicName, payload->message, payload->length, trace_log);
1086 if (publishPacket == NULL)
1087 {
1088 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1089 LogError("Error: mqtt_codec_publish failed");
1090 result = MU_FAILURE;
1091 }
1092 else
1093 {
1094 mqtt_client->packetState = PUBLISH_TYPE;
1095
1096 /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1097 size_t size = BUFFER_length(publishPacket);
1098 if (sendPacketItem(mqtt_client, BUFFER_u_char(publishPacket), size) != 0)
1099 {
1100 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1101 LogError("Error: mqtt_client_publish send failed");
1102 result = MU_FAILURE;
1103 }
1104 else
1105 {
1106 log_outgoing_trace(mqtt_client, trace_log);
1107 result = 0;
1108 }
1109 BUFFER_delete(publishPacket);
1110 }
1111 if (trace_log != NULL)
1112 {
1113 STRING_delete(trace_log);
1114 }
1115 }
1116 }
1117 return result;
1118}
1119
1120int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
1121{
1122 int result;
1123 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1124 if (mqtt_client == NULL || subscribeList == NULL || count == 0 || packetId == 0)
1125 {
1126 /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
1127 LogError("Invalid parameter specified mqtt_client: %p, subscribeList: %p, count: %lu, packetId: %d", mqtt_client, subscribeList, (unsigned long)count, packetId);
1128 result = MU_FAILURE;
1129 }
1130 else
1131 {
1132 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1133
1134 BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count, trace_log);
1135 if (subPacket == NULL)
1136 {
1137 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1138 LogError("Error: mqtt_codec_subscribe failed");
1139 result = MU_FAILURE;
1140 }
1141 else
1142 {
1143 mqtt_client->packetState = SUBSCRIBE_TYPE;
1144
1145 size_t size = BUFFER_length(subPacket);
1146 /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1147 if (sendPacketItem(mqtt_client, BUFFER_u_char(subPacket), size) != 0)
1148 {
1149 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1150 LogError("Error: mqtt_client_subscribe send failed");
1151 result = MU_FAILURE;
1152 }
1153 else
1154 {
1155 log_outgoing_trace(mqtt_client, trace_log);
1156 result = 0;
1157 }
1158 BUFFER_delete(subPacket);
1159 }
1160 if (trace_log != NULL)
1161 {
1162 STRING_delete(trace_log);
1163 }
1164 }
1165 return result;
1166}
1167
1168int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
1169{
1170 int result;
1171 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1172 if (mqtt_client == NULL || unsubscribeList == NULL || count == 0 || packetId == 0)
1173 {
1174 /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
1175 LogError("Invalid parameter specified mqtt_client: %p, unsubscribeList: %p, count: %lu, packetId: %d", mqtt_client, unsubscribeList, (unsigned long)count, packetId);
1176 result = MU_FAILURE;
1177 }
1178 else
1179 {
1180 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1181
1182 BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count, trace_log);
1183 if (unsubPacket == NULL)
1184 {
1185 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1186 LogError("Error: mqtt_codec_unsubscribe failed");
1187 result = MU_FAILURE;
1188 }
1189 else
1190 {
1191 mqtt_client->packetState = UNSUBSCRIBE_TYPE;
1192
1193 size_t size = BUFFER_length(unsubPacket);
1194 /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1195 if (sendPacketItem(mqtt_client, BUFFER_u_char(unsubPacket), size) != 0)
1196 {
1197 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
1198 LogError("Error: mqtt_client_unsubscribe send failed");
1199 result = MU_FAILURE;
1200 }
1201 else
1202 {
1203 log_outgoing_trace(mqtt_client, trace_log);
1204 result = 0;
1205 }
1206 BUFFER_delete(unsubPacket);
1207 }
1208 if (trace_log != NULL)
1209 {
1210 STRING_delete(trace_log);
1211 }
1212 }
1213 return result;
1214}
1215
1216int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle, ON_MQTT_DISCONNECTED_CALLBACK callback, void* ctx)
1217{
1218 int result;
1219 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1220 if (mqtt_client == NULL)
1221 {
1222 /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
1223 result = MU_FAILURE;
1224 }
1225 else
1226 {
1227 if (mqtt_client->mqtt_status & MQTT_STATUS_CLIENT_CONNECTED)
1228 {
1229 BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
1230 if (disconnectPacket == NULL)
1231 {
1232 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1233 LogError("Error: mqtt_client_disconnect failed");
1234 mqtt_client->packetState = PACKET_TYPE_ERROR;
1235 result = MU_FAILURE;
1236 }
1237 else
1238 {
1239 /* Codes_SRS_MQTT_CLIENT_07_037: [ if callback is not NULL callback shall be called once the mqtt connection has been disconnected ] */
1240 mqtt_client->disconnect_cb = callback;
1241 mqtt_client->disconnect_ctx = ctx;
1242 mqtt_client->packetState = DISCONNECT_TYPE;
1243
1244 size_t size = BUFFER_length(disconnectPacket);
1245 /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
1246 if (sendPacketItem(mqtt_client, BUFFER_u_char(disconnectPacket), size) != 0)
1247 {
1248 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1249 LogError("Error: mqtt_client_disconnect send failed");
1250 result = MU_FAILURE;
1251 }
1252 else
1253 {
1254 if (is_trace_enabled(mqtt_client))
1255 {
1256 STRING_HANDLE trace_log = STRING_construct("DISCONNECT");
1257 log_outgoing_trace(mqtt_client, trace_log);
1258 STRING_delete(trace_log);
1259 }
1260 result = 0;
1261 }
1262 BUFFER_delete(disconnectPacket);
1263 }
1264 clear_mqtt_options(mqtt_client);
1265 }
1266 else
1267 {
1268 // If the client is not connected then just close the underlying socket
1269 mqtt_client->disconnect_cb = callback;
1270 mqtt_client->disconnect_ctx = ctx;
1271
1272 close_connection(mqtt_client);
1273 clear_mqtt_options(mqtt_client);
1274 result = 0;
1275 }
1276 }
1277 return result;
1278}
1279
1280void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
1281{
1282 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1283 /*Codes_SRS_MQTT_CLIENT_18_001: [If the client is disconnected, mqtt_client_dowork shall do nothing.]*/
1284 /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
1285 if (mqtt_client != NULL && mqtt_client->xioHandle != NULL)
1286 {
1287 if (mqtt_client->mqtt_status & MQTT_STATUS_PENDING_CLOSE)
1288 {
1289 close_connection(mqtt_client);
1290 // turn off pending close
1291 mqtt_client->mqtt_status &= ~MQTT_STATUS_PENDING_CLOSE;
1292 }
1293 else
1294 {
1295 /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
1296 xio_dowork(mqtt_client->xioHandle);
1297
1298 /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
1299 if (mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED &&
1300 mqtt_client->mqtt_status & MQTT_STATUS_CLIENT_CONNECTED &&
1301 mqtt_client->keepAliveInterval > 0)
1302 {
1303 tickcounter_ms_t current_ms;
1304 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &current_ms) != 0)
1305 {
1306 LogError("Error: tickcounter_get_current_ms failed");
1307 }
1308 else
1309 {
1310 /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
1311 if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime)
1312 {
1313 // We haven't gotten a ping response in the alloted time
1314 set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE);
1315 mqtt_client->timeSincePing = 0;
1316 mqtt_client->packetSendTimeMs = 0;
1317 mqtt_client->packetState = UNKNOWN_TYPE;
1318 }
1319 else if (((current_ms - mqtt_client->packetSendTimeMs) / 1000) >= mqtt_client->keepAliveInterval)
1320 {
1321 /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
1322 BUFFER_HANDLE pingPacket = mqtt_codec_ping();
1323 if (pingPacket != NULL)
1324 {
1325 size_t size = BUFFER_length(pingPacket);
1326 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pingPacket), size);
1327 BUFFER_delete(pingPacket);
1328 (void)tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->timeSincePing);
1329
1330 if (is_trace_enabled(mqtt_client))
1331 {
1332 STRING_HANDLE trace_log = STRING_construct("PINGREQ");
1333 log_outgoing_trace(mqtt_client, trace_log);
1334 STRING_delete(trace_log);
1335 }
1336 }
1337 }
1338 }
1339 }
1340 }
1341 }
1342}
1343
1344void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
1345{
1346 AZURE_UNREFERENCED_PARAMETER(handle);
1347 AZURE_UNREFERENCED_PARAMETER(traceOn);
1348 AZURE_UNREFERENCED_PARAMETER(rawBytesOn);
1349#ifndef NO_LOGGING
1350 if (handle != NULL)
1351 {
1352 if (traceOn)
1353 {
1354 handle->mqtt_flags |= MQTT_FLAGS_LOG_TRACE;
1355 }
1356 else
1357 {
1358 handle->mqtt_flags &= ~MQTT_FLAGS_LOG_TRACE;
1359 }
1360#ifdef ENABLE_RAW_TRACE
1361 if (rawBytesOn)
1362 {
1363 handle->mqtt_flags |= MQTT_FLAGS_RAW_TRACE;
1364 }
1365 else
1366 {
1367 handle->mqtt_flags &= ~MQTT_FLAGS_RAW_TRACE;
1368 }
1369#endif
1370 }
1371#endif
1372}
Note: See TracBrowser for help on using the repository browser.