source: azure_iot_hub/trunk/azure_iothub/umqtt/src/mqtt_client.c@ 399

Last change on this file since 399 was 399, checked in by coas-nagasima, 5 years ago

更新

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 49.2 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include "azure_c_shared_utility/agenttime.h"
6#include "azure_c_shared_utility/const_defines.h"
7#include "azure_c_shared_utility/crt_abstractions.h"
8#include "azure_c_shared_utility/gballoc.h"
9#include "azure_c_shared_utility/optimize_size.h"
10#include "azure_c_shared_utility/macro_utils.h"
11#include "azure_c_shared_utility/platform.h"
12#include "azure_c_shared_utility/strings.h"
13#include "azure_c_shared_utility/threadapi.h"
14#include "azure_c_shared_utility/tickcounter.h"
15#include "azure_c_shared_utility/xlogging.h"
16#include "azure_c_shared_utility/macro_utils.h"
17
18#include "azure_umqtt_c/mqtt_client.h"
19#include "azure_umqtt_c/mqtt_codec.h"
20#include <inttypes.h>
21
22#define VARIABLE_HEADER_OFFSET 2
23#define RETAIN_FLAG_MASK 0x1
24#define QOS_LEAST_ONCE_FLAG_MASK 0x2
25#define QOS_EXACTLY_ONCE_FLAG_MASK 0x4
26#define DUPLICATE_FLAG_MASK 0x8
27#define CONNECT_PACKET_MASK 0xf0
28#define TIME_MAX_BUFFER 16
29#define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings
30#define MAX_CLOSE_RETRIES 20
31
32static const char* const TRUE_CONST = "true";
33static const char* const FALSE_CONST = "false";
34
35MU_DEFINE_ENUM_STRINGS(QOS_VALUE, QOS_VALUE_VALUES);
36
37typedef struct MQTT_CLIENT_TAG
38{
39 XIO_HANDLE xioHandle;
40 MQTTCODEC_HANDLE codec_handle;
41 CONTROL_PACKET_TYPE packetState;
42 TICK_COUNTER_HANDLE packetTickCntr;
43 tickcounter_ms_t packetSendTimeMs;
44 ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
45 ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
46 void* ctx;
47 ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack;
48 void* errorCBCtx;
49 ON_MQTT_DISCONNECTED_CALLBACK disconnect_cb;
50 void* disconnect_ctx;
51 QOS_VALUE qosValue;
52 uint16_t keepAliveInterval;
53 MQTT_CLIENT_OPTIONS mqttOptions;
54 bool clientConnected;
55 bool socketConnected;
56 bool logTrace;
57 bool rawBytesTrace;
58 tickcounter_ms_t timeSincePing;
59 uint16_t maxPingRespTime;
60} MQTT_CLIENT;
61
62static void on_connection_closed(void* context)
63{
64 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
65 if (mqtt_client != NULL)
66 {
67 mqtt_client->socketConnected = false;
68 mqtt_client->clientConnected = false;
69 if (mqtt_client->disconnect_cb)
70 {
71 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
72 }
73 }
74}
75
76static void close_connection(MQTT_CLIENT* mqtt_client)
77{
78 if (mqtt_client->socketConnected)
79 {
80 (void)xio_close(mqtt_client->xioHandle, on_connection_closed, mqtt_client);
81 if (mqtt_client->disconnect_cb == NULL)
82 {
83 size_t counter = 0;
84 do
85 {
86 xio_dowork(mqtt_client->xioHandle);
87 counter++;
88 ThreadAPI_Sleep(2);
89 } while (mqtt_client->clientConnected && counter < MAX_CLOSE_RETRIES);
90 }
91 // Clear the handle because we don't use it anymore
92 mqtt_client->xioHandle = NULL;
93 }
94 else
95 {
96 if (mqtt_client->disconnect_cb)
97 {
98 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
99 }
100 }
101 mqtt_client->xioHandle = NULL;
102}
103
104static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type)
105{
106 if (mqtt_client->fnOnErrorCallBack)
107 {
108 mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx);
109 }
110 close_connection(mqtt_client);
111}
112
113static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client)
114{
115#ifndef NO_LOGGING
116 STRING_HANDLE trace_log;
117 if (mqtt_client->logTrace)
118 {
119 trace_log = STRING_new();
120 }
121 else
122 {
123 trace_log = NULL;
124 }
125 return trace_log;
126#else
127 return NULL;
128#endif
129}
130
131static uint16_t byteutil_read_uint16(uint8_t** buffer, size_t byteLen)
132{
133 uint16_t result = 0;
134 if (buffer != NULL && *buffer != NULL && byteLen >= 2)
135 {
136 result = 256 * (**buffer) + (*(*buffer + 1));
137 *buffer += 2; // Move the ptr
138 }
139 else
140 {
141 LogError("byteutil_read_uint16 == NULL or less than 2");
142 }
143 return result;
144}
145
146static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
147{
148 char* result = NULL;
149
150 const uint8_t* bufferInitial = *buffer;
151 // Get the length of the string
152 uint16_t stringLen = byteutil_read_uint16(buffer, *byteLen);
153 // Verify that byteutil_read_uint16 succeeded (by stringLen>0) and that we're
154 // not being asked to read a string longer than buffer passed in.
155 if ((stringLen > 0) && ((size_t)(stringLen + (*buffer - bufferInitial)) <= *byteLen))
156 {
157 result = (char*)malloc(stringLen + 1);
158 if (result != NULL)
159 {
160 (void)memcpy(result, *buffer, stringLen);
161 result[stringLen] = '\0';
162 *buffer += stringLen;
163 *byteLen = stringLen;
164 }
165 }
166 else
167 {
168 LogError("String passed not a valid UTF.");
169 }
170
171 return result;
172}
173
174static uint8_t byteutil_readByte(uint8_t** buffer)
175{
176 uint8_t result = 0;
177 if (buffer != NULL)
178 {
179 result = **buffer;
180 (*buffer)++;
181 }
182 else
183 {
184 LogError("readByte buffer == NULL.");
185 }
186 return result;
187}
188
189static void sendComplete(void* context, IO_SEND_RESULT send_result)
190{
191 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
192 if (mqtt_client != NULL)
193 {
194 if (send_result == IO_SEND_OK)
195 {
196 if (mqtt_client->packetState == DISCONNECT_TYPE)
197 {
198 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
199 if (mqtt_client->fnOperationCallback != NULL)
200 {
201 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
202 }
203 // close the xio
204 close_connection(mqtt_client);
205 }
206 }
207 else if (send_result == IO_SEND_ERROR)
208 {
209 LogError("MQTT Send Complete Failure send_result: %d", (int)send_result);
210 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
211 }
212 }
213 else
214 {
215 LogError("MQTT Send Complete Failure with NULL mqtt_client");
216 }
217}
218
219#ifndef NO_LOGGING
220static void getLogTime(char* timeResult, size_t len)
221{
222 if (timeResult != NULL)
223 {
224 time_t agent_time = get_time(NULL);
225 if (agent_time == (time_t)-1)
226 {
227 timeResult[0] = '\0';
228 }
229 else
230 {
231 struct tm* tmInfo = localtime(&agent_time);
232 if (tmInfo == NULL)
233 {
234 timeResult[0] = '\0';
235 }
236 else
237 {
238 if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0)
239 {
240 timeResult[0] = '\0';
241 }
242 }
243 }
244 }
245}
246
247#ifdef ENABLE_RAW_TRACE
248static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
249{
250 switch (packet&CONNECT_PACKET_MASK)
251 {
252 case CONNECT_TYPE: return "CONNECT";
253 case CONNACK_TYPE: return "CONNACK";
254 case PUBLISH_TYPE: return "PUBLISH";
255 case PUBACK_TYPE: return "PUBACK";
256 case PUBREC_TYPE: return "PUBREC";
257 case PUBREL_TYPE: return "PUBREL";
258 case SUBSCRIBE_TYPE: return "SUBSCRIBE";
259 case SUBACK_TYPE: return "SUBACK";
260 case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE";
261 case UNSUBACK_TYPE: return "UNSUBACK";
262 case PINGREQ_TYPE: return "PINGREQ";
263 case PINGRESP_TYPE: return "PINGRESP";
264 case DISCONNECT_TYPE: return "DISCONNECT";
265 default:
266 case PACKET_TYPE_ERROR:
267 case UNKNOWN_TYPE:
268 return "UNKNOWN";
269 }
270}
271
272static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
273{
274 if (mqtt_client != NULL && data != NULL && length > 0 && mqtt_client->rawBytesTrace)
275 {
276 char tmBuffer[TIME_MAX_BUFFER];
277 getLogTime(tmBuffer, TIME_MAX_BUFFER);
278
279 LOG(AZ_LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0]));
280 size_t index = 0;
281 for (index = 0; index < length; index++)
282 {
283 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
284 }
285 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
286 }
287}
288
289static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
290{
291 if (mqtt_client != NULL && mqtt_client->rawBytesTrace)
292 {
293 if (data != NULL && length > 0)
294 {
295 char tmBuffer[TIME_MAX_BUFFER];
296 getLogTime(tmBuffer, TIME_MAX_BUFFER);
297
298 LOG(AZ_LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
299 size_t index = 0;
300 for (index = 0; index < length; index++)
301 {
302 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
303 }
304 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
305 }
306 else if (packet == PINGRESP_TYPE)
307 {
308 char tmBuffer[TIME_MAX_BUFFER];
309 getLogTime(tmBuffer, TIME_MAX_BUFFER);
310 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
311 }
312 }
313}
314#endif // ENABLE_RAW_TRACE
315
316static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
317{
318 if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL)
319 {
320 char tmBuffer[TIME_MAX_BUFFER];
321 getLogTime(tmBuffer, TIME_MAX_BUFFER);
322 LOG(AZ_LOG_TRACE, LOG_LINE, "-> %s %s", tmBuffer, STRING_c_str(trace_log));
323 }
324}
325
326static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
327{
328 if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL)
329 {
330 char tmBuffer[TIME_MAX_BUFFER];
331 getLogTime(tmBuffer, TIME_MAX_BUFFER);
332 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s", tmBuffer, STRING_c_str(trace_log) );
333 }
334}
335#else // NO_LOGGING
336static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
337{
338 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
339 AZURE_UNREFERENCED_PARAMETER(data);
340 AZURE_UNREFERENCED_PARAMETER(length);
341}
342
343static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
344{
345 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
346 AZURE_UNREFERENCED_PARAMETER(trace_log);
347}
348
349static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
350{
351 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
352 AZURE_UNREFERENCED_PARAMETER(trace_log);
353}
354
355static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
356{
357 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
358 AZURE_UNREFERENCED_PARAMETER(packet);
359 AZURE_UNREFERENCED_PARAMETER(flags);
360 AZURE_UNREFERENCED_PARAMETER(data);
361 AZURE_UNREFERENCED_PARAMETER(length);
362}
363#endif // NO_LOGGING
364
365static int sendPacketItem(MQTT_CLIENT* mqtt_client, const unsigned char* data, size_t length)
366{
367 int result;
368
369 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->packetSendTimeMs) != 0)
370 {
371 LogError("Failure getting current ms tickcounter");
372 result = MU_FAILURE;
373 }
374 else
375 {
376 result = xio_send(mqtt_client->xioHandle, (const void*)data, length, sendComplete, mqtt_client);
377 if (result != 0)
378 {
379 LogError("%d: Failure sending control packet data", result);
380 result = MU_FAILURE;
381 }
382 else
383 {
384#ifdef ENABLE_RAW_TRACE
385 logOutgoingRawTrace(mqtt_client, (const uint8_t*)data, length);
386#endif
387 }
388 }
389 return result;
390}
391
392static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
393{
394 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
395 if (mqtt_client != NULL)
396 {
397 if (open_result == IO_OPEN_OK && !mqtt_client->socketConnected)
398 {
399 mqtt_client->packetState = CONNECT_TYPE;
400 mqtt_client->socketConnected = true;
401
402 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
403
404 // Send the Connect packet
405 BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log);
406 if (connPacket == NULL)
407 {
408 LogError("Error: mqtt_codec_connect failed");
409 }
410 else
411 {
412 size_t size = BUFFER_length(connPacket);
413 /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
414 if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0)
415 {
416 LogError("Error: mqtt_codec_connect failed");
417 }
418 else
419 {
420 log_outgoing_trace(mqtt_client, trace_log);
421 }
422 BUFFER_delete(connPacket);
423 }
424 if (trace_log != NULL)
425 {
426 STRING_delete(trace_log);
427 }
428 }
429 else
430 {
431 if (mqtt_client->socketConnected == false && mqtt_client->fnOnErrorCallBack)
432 {
433 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
434 }
435 close_connection(mqtt_client);
436 }
437 }
438 else
439 {
440 LogError("Error: mqtt_client is NULL");
441 }
442}
443
444static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
445{
446 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
447 if (mqtt_client != NULL)
448 {
449 if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0)
450 {
451 mqtt_codec_reset(mqtt_client->codec_handle);
452 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
453 }
454 }
455 else
456 {
457 LogError("Error: mqtt_client is NULL");
458 }
459}
460
461static void onIoError(void* context)
462{
463 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
464 if (mqtt_client != NULL && mqtt_client->fnOperationCallback)
465 {
466 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
467 /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */
468 set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR);
469 }
470 else
471 {
472 LogError("Error invalid parameter: mqtt_client: %p", mqtt_client);
473 }
474}
475
476static void clear_mqtt_options(MQTT_CLIENT* mqtt_client)
477{
478 if (mqtt_client->mqttOptions.clientId != NULL)
479 {
480 free(mqtt_client->mqttOptions.clientId);
481 mqtt_client->mqttOptions.clientId = NULL;
482 }
483
484 if (mqtt_client->mqttOptions.willTopic != NULL)
485 {
486 free(mqtt_client->mqttOptions.willTopic);
487 mqtt_client->mqttOptions.willTopic = NULL;
488 }
489
490 if (mqtt_client->mqttOptions.willMessage != NULL)
491 {
492 free(mqtt_client->mqttOptions.willMessage);
493 mqtt_client->mqttOptions.willMessage = NULL;
494 }
495
496 if (mqtt_client->mqttOptions.username != NULL)
497 {
498 free(mqtt_client->mqttOptions.username);
499 mqtt_client->mqttOptions.username = NULL;
500 }
501
502 if (mqtt_client->mqttOptions.password != NULL)
503 {
504 free(mqtt_client->mqttOptions.password);
505 mqtt_client->mqttOptions.password = NULL;
506 }
507}
508
509static int cloneMqttOptions(MQTT_CLIENT* mqtt_client, const MQTT_CLIENT_OPTIONS* mqttOptions)
510{
511 int result = 0;
512 char* temp_option;
513
514 if (mqttOptions->clientId != NULL)
515 {
516 if (mallocAndStrcpy_s(&temp_option, mqttOptions->clientId) != 0)
517 {
518 result = MU_FAILURE;
519 LogError("mallocAndStrcpy_s clientId");
520 }
521 else
522 {
523 if (mqtt_client->mqttOptions.clientId != NULL)
524 {
525 free(mqtt_client->mqttOptions.clientId);
526 }
527 mqtt_client->mqttOptions.clientId = temp_option;
528 }
529 }
530 if (result == 0 && mqttOptions->willTopic != NULL)
531 {
532 temp_option = NULL;
533 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willTopic) != 0)
534 {
535 result = MU_FAILURE;
536 LogError("mallocAndStrcpy_s willTopic");
537 }
538 else
539 {
540 if (mqtt_client->mqttOptions.willTopic != NULL)
541 {
542 free(mqtt_client->mqttOptions.willTopic);
543 }
544 mqtt_client->mqttOptions.willTopic = temp_option;
545 }
546 }
547 if (result == 0 && mqttOptions->willMessage != NULL)
548 {
549 temp_option = NULL;
550 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willMessage) != 0)
551 {
552 LogError("mallocAndStrcpy_s willMessage");
553 result = MU_FAILURE;
554 }
555 else
556 {
557 if (mqtt_client->mqttOptions.willMessage != NULL)
558 {
559 free(mqtt_client->mqttOptions.willMessage);
560 }
561 mqtt_client->mqttOptions.willMessage = temp_option;
562 }
563 }
564 if (result == 0 && mqttOptions->username != NULL)
565 {
566 temp_option = NULL;
567 if (mallocAndStrcpy_s(&temp_option, mqttOptions->username) != 0)
568 {
569 LogError("mallocAndStrcpy_s username");
570 result = MU_FAILURE;
571 }
572 else
573 {
574 if (mqtt_client->mqttOptions.username != NULL)
575 {
576 free(mqtt_client->mqttOptions.username);
577 }
578 mqtt_client->mqttOptions.username = temp_option;
579 }
580 }
581 if (result == 0 && mqttOptions->password != NULL)
582 {
583 temp_option = NULL;
584 if (mallocAndStrcpy_s(&temp_option, mqttOptions->password) != 0)
585 {
586 LogError("mallocAndStrcpy_s password");
587 result = MU_FAILURE;
588 }
589 else
590 {
591 if (mqtt_client->mqttOptions.password != NULL)
592 {
593 free(mqtt_client->mqttOptions.password);
594 }
595 mqtt_client->mqttOptions.password = temp_option;
596 }
597 }
598 if (result == 0)
599 {
600 mqtt_client->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
601 mqtt_client->mqttOptions.messageRetain = mqttOptions->messageRetain;
602 mqtt_client->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
603 mqtt_client->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
604 }
605 else
606 {
607 clear_mqtt_options(mqtt_client);
608 }
609 return result;
610}
611
612static void ProcessPublishMessage(MQTT_CLIENT* mqtt_client, uint8_t* initialPos, size_t packetLength, int flags)
613{
614 bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
615 bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
616 QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
617
618 uint8_t* iterator = initialPos;
619 size_t numberOfBytesToBeRead = packetLength;
620 size_t lengthOfTopicName = numberOfBytesToBeRead;
621 char* topicName = byteutil_readUTF(&iterator, &lengthOfTopicName);
622 if (topicName == NULL)
623 {
624 LogError("Publish MSG: failure reading topic name");
625 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
626 }
627 else
628 {
629 STRING_HANDLE trace_log = NULL;
630
631#ifndef NO_LOGGING
632 if (mqtt_client->logTrace)
633 {
634 trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s | TOPIC_NAME: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
635 isRetainMsg ? 1 : 0, MU_ENUM_TO_STRING(QOS_VALUE, qosValue), topicName);
636 }
637#endif
638 uint16_t packetId = 0;
639 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
640 if (qosValue != DELIVER_AT_MOST_ONCE)
641 {
642 packetId = byteutil_read_uint16(&iterator, numberOfBytesToBeRead);
643#ifndef NO_LOGGING
644 if (mqtt_client->logTrace)
645 {
646 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
647 }
648#endif
649 }
650 if ((qosValue != DELIVER_AT_MOST_ONCE) && (packetId == 0))
651 {
652 LogError("Publish MSG: packetId=0, invalid");
653 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
654 }
655 else
656 {
657 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
658
659 MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create_in_place(packetId, topicName, qosValue, iterator, numberOfBytesToBeRead);
660 if (msgHandle == NULL)
661 {
662 LogError("failure in mqttmessage_create");
663 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
664 }
665 else if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
666 mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
667 {
668 LogError("failure setting mqtt message property");
669 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
670 }
671 else
672 {
673#ifndef NO_LOGGING
674 if (mqtt_client->logTrace)
675 {
676 STRING_sprintf(trace_log, " | PAYLOAD_LEN: %lu", (unsigned long)numberOfBytesToBeRead);
677 log_incoming_trace(mqtt_client, trace_log);
678 }
679#endif
680 mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);
681
682 BUFFER_HANDLE pubRel = NULL;
683 if (qosValue == DELIVER_EXACTLY_ONCE)
684 {
685 pubRel = mqtt_codec_publishReceived(packetId);
686 if (pubRel == NULL)
687 {
688 LogError("Failed to allocate publish receive message.");
689 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
690 }
691 }
692 else if (qosValue == DELIVER_AT_LEAST_ONCE)
693 {
694 pubRel = mqtt_codec_publishAck(packetId);
695 if (pubRel == NULL)
696 {
697 LogError("Failed to allocate publish ack message.");
698 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
699 }
700 }
701 if (pubRel != NULL)
702 {
703 size_t size = BUFFER_length(pubRel);
704 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size);
705 BUFFER_delete(pubRel);
706 }
707 }
708 mqttmessage_destroy(msgHandle);
709 }
710
711 if (trace_log != NULL)
712 {
713 STRING_delete(trace_log);
714 }
715
716 free(topicName);
717 }
718}
719
720static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
721{
722 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
723 if (mqtt_client != NULL)
724 {
725 size_t packetLength = 0;
726 uint8_t* iterator = NULL;
727 if (headerData != NULL)
728 {
729 packetLength = BUFFER_length(headerData);
730 iterator = BUFFER_u_char(headerData);
731 }
732
733#ifdef ENABLE_RAW_TRACE
734 logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, packetLength);
735#endif
736 if ((iterator != NULL && packetLength > 0) || packet == PINGRESP_TYPE)
737 {
738 switch (packet)
739 {
740 case CONNACK_TYPE:
741 {
742 /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
743 CONNECT_ACK connack = { 0 };
744 connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
745 uint8_t rc = byteutil_readByte(&iterator);
746 connack.returnCode =
747 (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
748 (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;
749
750#ifndef NO_LOGGING
751 if (mqtt_client->logTrace)
752 {
753 STRING_HANDLE trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
754 log_incoming_trace(mqtt_client, trace_log);
755 STRING_delete(trace_log);
756 }
757#endif
758 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);
759
760 if (connack.returnCode == CONNECTION_ACCEPTED)
761 {
762 mqtt_client->clientConnected = true;
763 }
764 break;
765 }
766 case PUBLISH_TYPE:
767 {
768 ProcessPublishMessage(mqtt_client, iterator, packetLength, flags);
769 break;
770 }
771 case PUBACK_TYPE:
772 case PUBREC_TYPE:
773 case PUBREL_TYPE:
774 case PUBCOMP_TYPE:
775 {
776 /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
777 MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
778 (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
779 (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
780
781 PUBLISH_ACK publish_ack = { 0 };
782 publish_ack.packetId = byteutil_read_uint16(&iterator, packetLength);
783
784#ifndef NO_LOGGING
785 if (mqtt_client->logTrace)
786 {
787 STRING_HANDLE trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
788 publish_ack.packetId);
789
790 log_incoming_trace(mqtt_client, trace_log);
791 STRING_delete(trace_log);
792 }
793#endif
794 BUFFER_HANDLE pubRel = NULL;
795 mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
796 if (packet == PUBREC_TYPE)
797 {
798 pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
799 if (pubRel == NULL)
800 {
801 LogError("Failed to allocate publish release message.");
802 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
803 }
804 }
805 else if (packet == PUBREL_TYPE)
806 {
807 pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
808 if (pubRel == NULL)
809 {
810 LogError("Failed to allocate publish complete message.");
811 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
812 }
813 }
814 if (pubRel != NULL)
815 {
816 size_t size = BUFFER_length(pubRel);
817 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size);
818 BUFFER_delete(pubRel);
819 }
820 break;
821 }
822 case SUBACK_TYPE:
823 {
824
825 /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
826 SUBSCRIBE_ACK suback = { 0 };
827
828 size_t remainLen = packetLength;
829 suback.packetId = byteutil_read_uint16(&iterator, packetLength);
830 remainLen -= 2;
831
832#ifndef NO_LOGGING
833 STRING_HANDLE trace_log = NULL;
834 if (mqtt_client->logTrace)
835 {
836 trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
837 }
838#endif
839 // Allocate the remaining len
840 suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
841 if (suback.qosReturn != NULL)
842 {
843 while (remainLen > 0)
844 {
845 uint8_t qosRet = byteutil_readByte(&iterator);
846 suback.qosReturn[suback.qosCount++] =
847 (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
848 (QOS_VALUE)qosRet : DELIVER_FAILURE;
849 remainLen--;
850#ifndef NO_LOGGING
851 if (mqtt_client->logTrace)
852 {
853 STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
854 }
855#endif
856 }
857
858#ifndef NO_LOGGING
859 if (mqtt_client->logTrace)
860 {
861 log_incoming_trace(mqtt_client, trace_log);
862 STRING_delete(trace_log);
863 }
864#endif
865 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
866 free(suback.qosReturn);
867 }
868 else
869 {
870 LogError("allocation of quality of service value failed.");
871 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
872 }
873 break;
874 }
875 case UNSUBACK_TYPE:
876 {
877 /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
878 UNSUBSCRIBE_ACK unsuback = { 0 };
879 unsuback.packetId = byteutil_read_uint16(&iterator, packetLength);
880
881#ifndef NO_LOGGING
882 if (mqtt_client->logTrace)
883 {
884 STRING_HANDLE trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
885 log_incoming_trace(mqtt_client, trace_log);
886 STRING_delete(trace_log);
887 }
888#endif
889 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
890 break;
891 }
892 case PINGRESP_TYPE:
893 mqtt_client->timeSincePing = 0;
894#ifndef NO_LOGGING
895 if (mqtt_client->logTrace)
896 {
897 STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
898 log_incoming_trace(mqtt_client, trace_log);
899 STRING_delete(trace_log);
900 }
901#endif
902 // Forward ping response to operation callback
903 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
904 break;
905 default:
906 break;
907 }
908 }
909 }
910 else
911 {
912 LogError("recvCompleteCallback context failed.");
913 }
914}
915
916MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK operation_cb, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx)
917{
918 MQTT_CLIENT* result;
919 /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
920 if (msgRecv == NULL || operation_cb == NULL)
921 {
922 LogError("Invalid parameter specified msgRecv: %p, operation_cb: %p", msgRecv, operation_cb);
923 result = NULL;
924 }
925 else
926 {
927 result = malloc(sizeof(MQTT_CLIENT));
928 if (result == NULL)
929 {
930 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
931 LogError("mqtt_client_init failure: Allocation Failure");
932 }
933 else
934 {
935 memset(result, 0, sizeof(MQTT_CLIENT));
936 /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
937 result->packetState = UNKNOWN_TYPE;
938 result->fnOperationCallback = operation_cb;
939 result->ctx = opCallbackCtx;
940 result->fnMessageRecv = msgRecv;
941 result->fnOnErrorCallBack = onErrorCallBack;
942 result->errorCBCtx = errorCBCtx;
943 result->qosValue = DELIVER_AT_MOST_ONCE;
944 result->packetTickCntr = tickcounter_create();
945 result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
946 if (result->packetTickCntr == NULL)
947 {
948 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
949 LogError("mqtt_client_init failure: tickcounter_create failure");
950 free(result);
951 result = NULL;
952 }
953 else
954 {
955 result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
956 if (result->codec_handle == NULL)
957 {
958 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
959 LogError("mqtt_client_init failure: mqtt_codec_create failure");
960 tickcounter_destroy(result->packetTickCntr);
961 free(result);
962 result = NULL;
963 }
964 }
965 }
966 }
967 return result;
968}
969
970void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
971{
972 /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
973 if (handle != NULL)
974 {
975 /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
976 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
977 tickcounter_destroy(mqtt_client->packetTickCntr);
978 mqtt_codec_destroy(mqtt_client->codec_handle);
979 clear_mqtt_options(mqtt_client);
980 free(mqtt_client);
981 }
982}
983
984int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
985{
986 int result;
987 /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
988 if (handle == NULL || mqttOptions == NULL || xioHandle == NULL)
989 {
990 LogError("mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p, xioHandle: %p)", handle, mqttOptions, xioHandle);
991 result = MU_FAILURE;
992 }
993 else
994 {
995 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
996 mqtt_client->xioHandle = xioHandle;
997 mqtt_client->packetState = UNKNOWN_TYPE;
998 mqtt_client->qosValue = mqttOptions->qualityOfServiceValue;
999 mqtt_client->keepAliveInterval = mqttOptions->keepAliveInterval;
1000 mqtt_client->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
1001 if (cloneMqttOptions(mqtt_client, mqttOptions) != 0)
1002 {
1003 LogError("Error: Clone Mqtt Options failed");
1004 result = MU_FAILURE;
1005 }
1006 /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
1007 else if (xio_open(xioHandle, onOpenComplete, mqtt_client, onBytesReceived, mqtt_client, onIoError, mqtt_client) != 0)
1008 {
1009 /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
1010 LogError("Error: io_open failed");
1011 result = MU_FAILURE;
1012 mqtt_client->xioHandle = NULL;
1013 // Remove cloned options
1014 clear_mqtt_options(mqtt_client);
1015 }
1016 else
1017 {
1018 result = 0;
1019 }
1020 }
1021 return result;
1022}
1023
1024int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
1025{
1026 int result;
1027 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1028 if (mqtt_client == NULL || msgHandle == NULL)
1029 {
1030 /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
1031 LogError("Invalid parameter specified mqtt_client: %p, msgHandle: %p", mqtt_client, msgHandle);
1032 result = MU_FAILURE;
1033 }
1034 else
1035 {
1036 /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
1037 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1038 if (payload == NULL)
1039 {
1040 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1041 LogError("Error: mqttmessage_getApplicationMsg failed");
1042 result = MU_FAILURE;
1043 }
1044 else
1045 {
1046 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1047
1048 QOS_VALUE qos = mqttmessage_getQosType(msgHandle);
1049 bool isDuplicate = mqttmessage_getIsDuplicateMsg(msgHandle);
1050 bool isRetained = mqttmessage_getIsRetained(msgHandle);
1051 uint16_t packetId = mqttmessage_getPacketId(msgHandle);
1052 const char* topicName = mqttmessage_getTopicName(msgHandle);
1053 BUFFER_HANDLE publishPacket = mqtt_codec_publish(qos, isDuplicate, isRetained, packetId, topicName, payload->message, payload->length, trace_log);
1054 if (publishPacket == NULL)
1055 {
1056 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1057 LogError("Error: mqtt_codec_publish failed");
1058 result = MU_FAILURE;
1059 }
1060 else
1061 {
1062 mqtt_client->packetState = PUBLISH_TYPE;
1063
1064 /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1065 size_t size = BUFFER_length(publishPacket);
1066 if (sendPacketItem(mqtt_client, BUFFER_u_char(publishPacket), size) != 0)
1067 {
1068 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1069 LogError("Error: mqtt_client_publish send failed");
1070 result = MU_FAILURE;
1071 }
1072 else
1073 {
1074 log_outgoing_trace(mqtt_client, trace_log);
1075 result = 0;
1076 }
1077 BUFFER_delete(publishPacket);
1078 }
1079 if (trace_log != NULL)
1080 {
1081 STRING_delete(trace_log);
1082 }
1083 }
1084 }
1085 return result;
1086}
1087
1088int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
1089{
1090 int result;
1091 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1092 if (mqtt_client == NULL || subscribeList == NULL || count == 0 || packetId == 0)
1093 {
1094 /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
1095 LogError("Invalid parameter specified mqtt_client: %p, subscribeList: %p, count: %lu, packetId: %d", mqtt_client, subscribeList, (unsigned long)count, packetId);
1096 result = MU_FAILURE;
1097 }
1098 else
1099 {
1100 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1101
1102 BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count, trace_log);
1103 if (subPacket == NULL)
1104 {
1105 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1106 LogError("Error: mqtt_codec_subscribe failed");
1107 result = MU_FAILURE;
1108 }
1109 else
1110 {
1111 mqtt_client->packetState = SUBSCRIBE_TYPE;
1112
1113 size_t size = BUFFER_length(subPacket);
1114 /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1115 if (sendPacketItem(mqtt_client, BUFFER_u_char(subPacket), size) != 0)
1116 {
1117 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1118 LogError("Error: mqtt_client_subscribe send failed");
1119 result = MU_FAILURE;
1120 }
1121 else
1122 {
1123 log_outgoing_trace(mqtt_client, trace_log);
1124 result = 0;
1125 }
1126 BUFFER_delete(subPacket);
1127 }
1128 if (trace_log != NULL)
1129 {
1130 STRING_delete(trace_log);
1131 }
1132 }
1133 return result;
1134}
1135
1136int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
1137{
1138 int result;
1139 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1140 if (mqtt_client == NULL || unsubscribeList == NULL || count == 0 || packetId == 0)
1141 {
1142 /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
1143 LogError("Invalid parameter specified mqtt_client: %p, unsubscribeList: %p, count: %lu, packetId: %d", mqtt_client, unsubscribeList, (unsigned long)count, packetId);
1144 result = MU_FAILURE;
1145 }
1146 else
1147 {
1148 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1149
1150 BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count, trace_log);
1151 if (unsubPacket == NULL)
1152 {
1153 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1154 LogError("Error: mqtt_codec_unsubscribe failed");
1155 result = MU_FAILURE;
1156 }
1157 else
1158 {
1159 mqtt_client->packetState = UNSUBSCRIBE_TYPE;
1160
1161 size_t size = BUFFER_length(unsubPacket);
1162 /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1163 if (sendPacketItem(mqtt_client, BUFFER_u_char(unsubPacket), size) != 0)
1164 {
1165 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
1166 LogError("Error: mqtt_client_unsubscribe send failed");
1167 result = MU_FAILURE;
1168 }
1169 else
1170 {
1171 log_outgoing_trace(mqtt_client, trace_log);
1172 result = 0;
1173 }
1174 BUFFER_delete(unsubPacket);
1175 }
1176 if (trace_log != NULL)
1177 {
1178 STRING_delete(trace_log);
1179 }
1180 }
1181 return result;
1182}
1183
1184int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle, ON_MQTT_DISCONNECTED_CALLBACK callback, void* ctx)
1185{
1186 int result;
1187 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1188 if (mqtt_client == NULL)
1189 {
1190 /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
1191 result = MU_FAILURE;
1192 }
1193 else
1194 {
1195 if (mqtt_client->clientConnected)
1196 {
1197 BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
1198 if (disconnectPacket == NULL)
1199 {
1200 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1201 LogError("Error: mqtt_client_disconnect failed");
1202 mqtt_client->packetState = PACKET_TYPE_ERROR;
1203 result = MU_FAILURE;
1204 }
1205 else
1206 {
1207 /* Codes_SRS_MQTT_CLIENT_07_037: [ if callback is not NULL callback shall be called once the mqtt connection has been disconnected ] */
1208 mqtt_client->disconnect_cb = callback;
1209 mqtt_client->disconnect_ctx = ctx;
1210 mqtt_client->packetState = DISCONNECT_TYPE;
1211
1212 size_t size = BUFFER_length(disconnectPacket);
1213 /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
1214 if (sendPacketItem(mqtt_client, BUFFER_u_char(disconnectPacket), size) != 0)
1215 {
1216 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1217 LogError("Error: mqtt_client_disconnect send failed");
1218 result = MU_FAILURE;
1219 }
1220 else
1221 {
1222#ifndef NO_LOGGING
1223 if (mqtt_client->logTrace)
1224 {
1225 STRING_HANDLE trace_log = STRING_construct("DISCONNECT");
1226 log_outgoing_trace(mqtt_client, trace_log);
1227 STRING_delete(trace_log);
1228 }
1229#endif
1230 result = 0;
1231 }
1232 BUFFER_delete(disconnectPacket);
1233 }
1234 clear_mqtt_options(mqtt_client);
1235 }
1236 else
1237 {
1238 // If the client is not connected then just close the underlying socket
1239 mqtt_client->disconnect_cb = callback;
1240 mqtt_client->disconnect_ctx = ctx;
1241
1242 close_connection(mqtt_client);
1243 clear_mqtt_options(mqtt_client);
1244 result = 0;
1245 }
1246 }
1247 return result;
1248}
1249
1250void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
1251{
1252 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1253 /*Codes_SRS_MQTT_CLIENT_18_001: [If the client is disconnected, mqtt_client_dowork shall do nothing.]*/
1254 /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
1255 if (mqtt_client != NULL && mqtt_client->xioHandle != NULL)
1256 {
1257 /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
1258 xio_dowork(mqtt_client->xioHandle);
1259
1260 /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
1261 if (mqtt_client->socketConnected && mqtt_client->clientConnected && mqtt_client->keepAliveInterval > 0)
1262 {
1263 tickcounter_ms_t current_ms;
1264 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &current_ms) != 0)
1265 {
1266 LogError("Error: tickcounter_get_current_ms failed");
1267 }
1268 else
1269 {
1270 /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
1271 if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime)
1272 {
1273 // We haven't gotten a ping response in the alloted time
1274 set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE);
1275 mqtt_client->timeSincePing = 0;
1276 mqtt_client->packetSendTimeMs = 0;
1277 mqtt_client->packetState = UNKNOWN_TYPE;
1278 }
1279 else if (((current_ms - mqtt_client->packetSendTimeMs) / 1000) >= mqtt_client->keepAliveInterval)
1280 {
1281 /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
1282 BUFFER_HANDLE pingPacket = mqtt_codec_ping();
1283 if (pingPacket != NULL)
1284 {
1285 size_t size = BUFFER_length(pingPacket);
1286 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pingPacket), size);
1287 BUFFER_delete(pingPacket);
1288 (void)tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->timeSincePing);
1289#ifndef NO_LOGGING
1290 if (mqtt_client->logTrace)
1291 {
1292 STRING_HANDLE trace_log = STRING_construct("PINGREQ");
1293 log_outgoing_trace(mqtt_client, trace_log);
1294 STRING_delete(trace_log);
1295 }
1296#endif
1297 }
1298 }
1299 }
1300 }
1301 }
1302}
1303
1304void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
1305{
1306 AZURE_UNREFERENCED_PARAMETER(handle);
1307 AZURE_UNREFERENCED_PARAMETER(traceOn);
1308 AZURE_UNREFERENCED_PARAMETER(rawBytesOn);
1309#ifndef NO_LOGGING
1310 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1311 if (mqtt_client != NULL)
1312 {
1313 mqtt_client->logTrace = traceOn;
1314#ifdef ENABLE_RAW_TRACE
1315 mqtt_client->rawBytesTrace = rawBytesOn;
1316#endif
1317 }
1318#endif
1319}
Note: See TracBrowser for help on using the repository browser.