source: azure_iot_hub/trunk/azure_iothub/umqtt/src/mqtt_client.c@ 389

Last change on this file since 389 was 389, checked in by coas-nagasima, 5 years ago

ビルドが通るよう更新

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 49.1 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include "azure_c_shared_utility/agenttime.h"
6#include "azure_c_shared_utility/const_defines.h"
7#include "azure_c_shared_utility/crt_abstractions.h"
8#include "azure_c_shared_utility/gballoc.h"
9#include "azure_c_shared_utility/optimize_size.h"
10#include "azure_c_shared_utility/macro_utils.h"
11#include "azure_c_shared_utility/platform.h"
12#include "azure_c_shared_utility/strings.h"
13#include "azure_c_shared_utility/threadapi.h"
14#include "azure_c_shared_utility/tickcounter.h"
15#include "azure_c_shared_utility/xlogging.h"
16#include "azure_c_shared_utility/macro_utils.h"
17
18#include "azure_umqtt_c/mqtt_client.h"
19#include "azure_umqtt_c/mqtt_codec.h"
20#include <inttypes.h>
21
22#define VARIABLE_HEADER_OFFSET 2
23#define RETAIN_FLAG_MASK 0x1
24#define QOS_LEAST_ONCE_FLAG_MASK 0x2
25#define QOS_EXACTLY_ONCE_FLAG_MASK 0x4
26#define DUPLICATE_FLAG_MASK 0x8
27#define CONNECT_PACKET_MASK 0xf0
28#define TIME_MAX_BUFFER 16
29#define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings
30#define MAX_CLOSE_RETRIES 20
31
32static const char* const TRUE_CONST = "true";
33static const char* const FALSE_CONST = "false";
34
35MU_DEFINE_ENUM_STRINGS(QOS_VALUE, QOS_VALUE_VALUES);
36
37typedef struct MQTT_CLIENT_TAG
38{
39 XIO_HANDLE xioHandle;
40 MQTTCODEC_HANDLE codec_handle;
41 CONTROL_PACKET_TYPE packetState;
42 TICK_COUNTER_HANDLE packetTickCntr;
43 tickcounter_ms_t packetSendTimeMs;
44 ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
45 ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
46 void* ctx;
47 ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack;
48 void* errorCBCtx;
49 ON_MQTT_DISCONNECTED_CALLBACK disconnect_cb;
50 void* disconnect_ctx;
51 QOS_VALUE qosValue;
52 uint16_t keepAliveInterval;
53 MQTT_CLIENT_OPTIONS mqttOptions;
54 bool clientConnected;
55 bool socketConnected;
56 bool logTrace;
57 bool rawBytesTrace;
58 tickcounter_ms_t timeSincePing;
59 uint16_t maxPingRespTime;
60} MQTT_CLIENT;
61
62static void on_connection_closed(void* context)
63{
64 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
65 if (mqtt_client != NULL)
66 {
67 mqtt_client->socketConnected = false;
68 mqtt_client->clientConnected = false;
69 if (mqtt_client->disconnect_cb)
70 {
71 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
72 }
73 }
74}
75
76static void close_connection(MQTT_CLIENT* mqtt_client)
77{
78 if (mqtt_client->socketConnected)
79 {
80 (void)xio_close(mqtt_client->xioHandle, on_connection_closed, mqtt_client);
81 if (mqtt_client->disconnect_cb == NULL)
82 {
83 size_t counter = 0;
84 do
85 {
86 xio_dowork(mqtt_client->xioHandle);
87 counter++;
88 ThreadAPI_Sleep(2);
89 } while (mqtt_client->clientConnected && counter < MAX_CLOSE_RETRIES);
90 }
91 // Clear the handle because we don't use it anymore
92 mqtt_client->xioHandle = NULL;
93 }
94 else
95 {
96 if (mqtt_client->disconnect_cb)
97 {
98 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
99 }
100 }
101 mqtt_client->xioHandle = NULL;
102}
103
104static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type)
105{
106 if (mqtt_client->fnOnErrorCallBack)
107 {
108 mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx);
109 }
110 close_connection(mqtt_client);
111}
112
113static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client)
114{
115 STRING_HANDLE trace_log;
116 if (mqtt_client->logTrace)
117 {
118 trace_log = STRING_new();
119 }
120 else
121 {
122 trace_log = NULL;
123 }
124 return trace_log;
125}
126
127static uint16_t byteutil_read_uint16(uint8_t** buffer, size_t byteLen)
128{
129 uint16_t result = 0;
130 if (buffer != NULL && *buffer != NULL && byteLen >= 2)
131 {
132 result = 256 * (**buffer) + (*(*buffer + 1));
133 *buffer += 2; // Move the ptr
134 }
135 else
136 {
137 LogError("byteutil_read_uint16 == NULL or less than 2");
138 }
139 return result;
140}
141
142static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
143{
144 char* result = NULL;
145
146 const uint8_t* bufferInitial = *buffer;
147 // Get the length of the string
148 uint16_t stringLen = byteutil_read_uint16(buffer, *byteLen);
149 // Verify that byteutil_read_uint16 succeeded (by stringLen>0) and that we're
150 // not being asked to read a string longer than buffer passed in.
151 if ((stringLen > 0) && ((size_t)(stringLen + (*buffer - bufferInitial)) <= *byteLen))
152 {
153 result = (char*)malloc(stringLen + 1);
154 if (result != NULL)
155 {
156 (void)memcpy(result, *buffer, stringLen);
157 result[stringLen] = '\0';
158 *buffer += stringLen;
159 *byteLen = stringLen;
160 }
161 }
162 else
163 {
164 LogError("String passed not a valid UTF.");
165 }
166
167 return result;
168}
169
170static uint8_t byteutil_readByte(uint8_t** buffer)
171{
172 uint8_t result = 0;
173 if (buffer != NULL)
174 {
175 result = **buffer;
176 (*buffer)++;
177 }
178 else
179 {
180 LogError("readByte buffer == NULL.");
181 }
182 return result;
183}
184
185static void sendComplete(void* context, IO_SEND_RESULT send_result)
186{
187 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
188 if (mqtt_client != NULL)
189 {
190 if (send_result == IO_SEND_OK)
191 {
192 if (mqtt_client->packetState == DISCONNECT_TYPE)
193 {
194 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
195 if (mqtt_client->fnOperationCallback != NULL)
196 {
197 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
198 }
199 // close the xio
200 close_connection(mqtt_client);
201 }
202 }
203 else if (send_result == IO_SEND_ERROR)
204 {
205 LogError("MQTT Send Complete Failure send_result: %d", (int)send_result);
206 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
207 }
208 }
209 else
210 {
211 LogError("MQTT Send Complete Failure with NULL mqtt_client");
212 }
213}
214
215#ifndef NO_LOGGING
216static void getLogTime(char* timeResult, size_t len)
217{
218 if (timeResult != NULL)
219 {
220 time_t agent_time = get_time(NULL);
221 if (agent_time == (time_t)-1)
222 {
223 timeResult[0] = '\0';
224 }
225 else
226 {
227 struct tm* tmInfo = localtime(&agent_time);
228 if (tmInfo == NULL)
229 {
230 timeResult[0] = '\0';
231 }
232 else
233 {
234 if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0)
235 {
236 timeResult[0] = '\0';
237 }
238 }
239 }
240 }
241}
242
243#ifdef ENABLE_RAW_TRACE
244static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
245{
246 switch (packet&CONNECT_PACKET_MASK)
247 {
248 case CONNECT_TYPE: return "CONNECT";
249 case CONNACK_TYPE: return "CONNACK";
250 case PUBLISH_TYPE: return "PUBLISH";
251 case PUBACK_TYPE: return "PUBACK";
252 case PUBREC_TYPE: return "PUBREC";
253 case PUBREL_TYPE: return "PUBREL";
254 case SUBSCRIBE_TYPE: return "SUBSCRIBE";
255 case SUBACK_TYPE: return "SUBACK";
256 case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE";
257 case UNSUBACK_TYPE: return "UNSUBACK";
258 case PINGREQ_TYPE: return "PINGREQ";
259 case PINGRESP_TYPE: return "PINGRESP";
260 case DISCONNECT_TYPE: return "DISCONNECT";
261 default:
262 case PACKET_TYPE_ERROR:
263 case UNKNOWN_TYPE:
264 return "UNKNOWN";
265 }
266}
267
268static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
269{
270 if (mqtt_client != NULL && data != NULL && length > 0 && mqtt_client->rawBytesTrace)
271 {
272 char tmBuffer[TIME_MAX_BUFFER];
273 getLogTime(tmBuffer, TIME_MAX_BUFFER);
274
275 LOG(AZ_LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0]));
276 size_t index = 0;
277 for (index = 0; index < length; index++)
278 {
279 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
280 }
281 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
282 }
283}
284
285static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
286{
287 if (mqtt_client != NULL && mqtt_client->rawBytesTrace)
288 {
289 if (data != NULL && length > 0)
290 {
291 char tmBuffer[TIME_MAX_BUFFER];
292 getLogTime(tmBuffer, TIME_MAX_BUFFER);
293
294 LOG(AZ_LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
295 size_t index = 0;
296 for (index = 0; index < length; index++)
297 {
298 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
299 }
300 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
301 }
302 else if (packet == PINGRESP_TYPE)
303 {
304 char tmBuffer[TIME_MAX_BUFFER];
305 getLogTime(tmBuffer, TIME_MAX_BUFFER);
306 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
307 }
308 }
309}
310#endif // ENABLE_RAW_TRACE
311
312static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
313{
314 if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL)
315 {
316 char tmBuffer[TIME_MAX_BUFFER];
317 getLogTime(tmBuffer, TIME_MAX_BUFFER);
318 LOG(AZ_LOG_TRACE, LOG_LINE, "-> %s %s", tmBuffer, STRING_c_str(trace_log));
319 }
320}
321
322static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
323{
324 if (mqtt_client != NULL && mqtt_client->logTrace && trace_log != NULL)
325 {
326 char tmBuffer[TIME_MAX_BUFFER];
327 getLogTime(tmBuffer, TIME_MAX_BUFFER);
328 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s", tmBuffer, STRING_c_str(trace_log) );
329 }
330}
331#else // NO_LOGGING
332static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
333{
334 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
335 AZURE_UNREFERENCED_PARAMETER(data);
336 AZURE_UNREFERENCED_PARAMETER(length);
337}
338
339static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
340{
341 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
342 AZURE_UNREFERENCED_PARAMETER(trace_log);
343}
344
345static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
346{
347 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
348 AZURE_UNREFERENCED_PARAMETER(trace_log);
349}
350
351static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
352{
353 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
354 AZURE_UNREFERENCED_PARAMETER(packet);
355 AZURE_UNREFERENCED_PARAMETER(flags);
356 AZURE_UNREFERENCED_PARAMETER(data);
357 AZURE_UNREFERENCED_PARAMETER(length);
358}
359#endif // NO_LOGGING
360
361static int sendPacketItem(MQTT_CLIENT* mqtt_client, const unsigned char* data, size_t length)
362{
363 int result;
364
365 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->packetSendTimeMs) != 0)
366 {
367 LogError("Failure getting current ms tickcounter");
368 result = MU_FAILURE;
369 }
370 else
371 {
372 result = xio_send(mqtt_client->xioHandle, (const void*)data, length, sendComplete, mqtt_client);
373 if (result != 0)
374 {
375 LogError("%d: Failure sending control packet data", result);
376 result = MU_FAILURE;
377 }
378 else
379 {
380#ifdef ENABLE_RAW_TRACE
381 logOutgoingRawTrace(mqtt_client, (const uint8_t*)data, length);
382#endif
383 }
384 }
385 return result;
386}
387
388static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
389{
390 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
391 if (mqtt_client != NULL)
392 {
393 if (open_result == IO_OPEN_OK && !mqtt_client->socketConnected)
394 {
395 mqtt_client->packetState = CONNECT_TYPE;
396 mqtt_client->socketConnected = true;
397
398 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
399
400 // Send the Connect packet
401 BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log);
402 if (connPacket == NULL)
403 {
404 LogError("Error: mqtt_codec_connect failed");
405 }
406 else
407 {
408 size_t size = BUFFER_length(connPacket);
409 /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
410 if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0)
411 {
412 LogError("Error: mqtt_codec_connect failed");
413 }
414 else
415 {
416 log_outgoing_trace(mqtt_client, trace_log);
417 }
418 BUFFER_delete(connPacket);
419 }
420 if (trace_log != NULL)
421 {
422 STRING_delete(trace_log);
423 }
424 }
425 else
426 {
427 if (mqtt_client->socketConnected == false && mqtt_client->fnOnErrorCallBack)
428 {
429 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
430 }
431 close_connection(mqtt_client);
432 }
433 }
434 else
435 {
436 LogError("Error: mqtt_client is NULL");
437 }
438}
439
440static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
441{
442 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
443 if (mqtt_client != NULL)
444 {
445 if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0)
446 {
447 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
448 }
449 }
450 else
451 {
452 LogError("Error: mqtt_client is NULL");
453 }
454}
455
456static void onIoError(void* context)
457{
458 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
459 if (mqtt_client != NULL && mqtt_client->fnOperationCallback)
460 {
461 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
462 /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */
463 set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR);
464 }
465 else
466 {
467 LogError("Error invalid parameter: mqtt_client: %p", mqtt_client);
468 }
469}
470
471static void clear_mqtt_options(MQTT_CLIENT* mqtt_client)
472{
473 if (mqtt_client->mqttOptions.clientId != NULL)
474 {
475 free(mqtt_client->mqttOptions.clientId);
476 mqtt_client->mqttOptions.clientId = NULL;
477 }
478
479 if (mqtt_client->mqttOptions.willTopic != NULL)
480 {
481 free(mqtt_client->mqttOptions.willTopic);
482 mqtt_client->mqttOptions.willTopic = NULL;
483 }
484
485 if (mqtt_client->mqttOptions.willMessage != NULL)
486 {
487 free(mqtt_client->mqttOptions.willMessage);
488 mqtt_client->mqttOptions.willMessage = NULL;
489 }
490
491 if (mqtt_client->mqttOptions.username != NULL)
492 {
493 free(mqtt_client->mqttOptions.username);
494 mqtt_client->mqttOptions.username = NULL;
495 }
496
497 if (mqtt_client->mqttOptions.password != NULL)
498 {
499 free(mqtt_client->mqttOptions.password);
500 mqtt_client->mqttOptions.password = NULL;
501 }
502}
503
504static int cloneMqttOptions(MQTT_CLIENT* mqtt_client, const MQTT_CLIENT_OPTIONS* mqttOptions)
505{
506 int result = 0;
507 char* temp_option;
508
509 if (mqttOptions->clientId != NULL)
510 {
511 if (mallocAndStrcpy_s(&temp_option, mqttOptions->clientId) != 0)
512 {
513 result = MU_FAILURE;
514 LogError("mallocAndStrcpy_s clientId");
515 }
516 else
517 {
518 if (mqtt_client->mqttOptions.clientId != NULL)
519 {
520 free(mqtt_client->mqttOptions.clientId);
521 }
522 mqtt_client->mqttOptions.clientId = temp_option;
523 }
524 }
525 if (result == 0 && mqttOptions->willTopic != NULL)
526 {
527 temp_option = NULL;
528 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willTopic) != 0)
529 {
530 result = MU_FAILURE;
531 LogError("mallocAndStrcpy_s willTopic");
532 }
533 else
534 {
535 if (mqtt_client->mqttOptions.willTopic != NULL)
536 {
537 free(mqtt_client->mqttOptions.willTopic);
538 }
539 mqtt_client->mqttOptions.willTopic = temp_option;
540 }
541 }
542 if (result == 0 && mqttOptions->willMessage != NULL)
543 {
544 temp_option = NULL;
545 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willMessage) != 0)
546 {
547 LogError("mallocAndStrcpy_s willMessage");
548 result = MU_FAILURE;
549 }
550 else
551 {
552 if (mqtt_client->mqttOptions.willMessage != NULL)
553 {
554 free(mqtt_client->mqttOptions.willMessage);
555 }
556 mqtt_client->mqttOptions.willMessage = temp_option;
557 }
558 }
559 if (result == 0 && mqttOptions->username != NULL)
560 {
561 temp_option = NULL;
562 if (mallocAndStrcpy_s(&temp_option, mqttOptions->username) != 0)
563 {
564 LogError("mallocAndStrcpy_s username");
565 result = MU_FAILURE;
566 }
567 else
568 {
569 if (mqtt_client->mqttOptions.username != NULL)
570 {
571 free(mqtt_client->mqttOptions.username);
572 }
573 mqtt_client->mqttOptions.username = temp_option;
574 }
575 }
576 if (result == 0 && mqttOptions->password != NULL)
577 {
578 temp_option = NULL;
579 if (mallocAndStrcpy_s(&temp_option, mqttOptions->password) != 0)
580 {
581 LogError("mallocAndStrcpy_s password");
582 result = MU_FAILURE;
583 }
584 else
585 {
586 if (mqtt_client->mqttOptions.password != NULL)
587 {
588 free(mqtt_client->mqttOptions.password);
589 }
590 mqtt_client->mqttOptions.password = temp_option;
591 }
592 }
593 if (result == 0)
594 {
595 mqtt_client->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
596 mqtt_client->mqttOptions.messageRetain = mqttOptions->messageRetain;
597 mqtt_client->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
598 mqtt_client->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
599 }
600 else
601 {
602 clear_mqtt_options(mqtt_client);
603 }
604 return result;
605}
606
607static void ProcessPublishMessage(MQTT_CLIENT* mqtt_client, uint8_t* initialPos, size_t packetLength, int flags)
608{
609 bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
610 bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
611 QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
612
613 uint8_t* iterator = initialPos;
614 size_t numberOfBytesToBeRead = packetLength;
615 size_t lengthOfTopicName = numberOfBytesToBeRead;
616 char* topicName = byteutil_readUTF(&iterator, &lengthOfTopicName);
617 if (topicName == NULL)
618 {
619 LogError("Publish MSG: failure reading topic name");
620 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
621 }
622 else
623 {
624 STRING_HANDLE trace_log = NULL;
625
626#ifndef NO_LOGGING
627 if (mqtt_client->logTrace)
628 {
629 trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s | TOPIC_NAME: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
630 isRetainMsg ? 1 : 0, MU_ENUM_TO_STRING(QOS_VALUE, qosValue), topicName);
631 }
632#endif
633 uint16_t packetId = 0;
634 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
635 if (qosValue != DELIVER_AT_MOST_ONCE)
636 {
637 packetId = byteutil_read_uint16(&iterator, numberOfBytesToBeRead);
638#ifndef NO_LOGGING
639 if (mqtt_client->logTrace)
640 {
641 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
642 }
643#endif
644 }
645 if ((qosValue != DELIVER_AT_MOST_ONCE) && (packetId == 0))
646 {
647 LogError("Publish MSG: packetId=0, invalid");
648 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
649 }
650 else
651 {
652 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
653
654 MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create_in_place(packetId, topicName, qosValue, iterator, numberOfBytesToBeRead);
655 if (msgHandle == NULL)
656 {
657 LogError("failure in mqttmessage_create");
658 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
659 }
660 else if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
661 mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
662 {
663 LogError("failure setting mqtt message property");
664 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
665 }
666 else
667 {
668#ifndef NO_LOGGING
669 if (mqtt_client->logTrace)
670 {
671 STRING_sprintf(trace_log, " | PAYLOAD_LEN: %lu", (unsigned long)numberOfBytesToBeRead);
672 log_incoming_trace(mqtt_client, trace_log);
673 }
674#endif
675 mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);
676
677 BUFFER_HANDLE pubRel = NULL;
678 if (qosValue == DELIVER_EXACTLY_ONCE)
679 {
680 pubRel = mqtt_codec_publishReceived(packetId);
681 if (pubRel == NULL)
682 {
683 LogError("Failed to allocate publish receive message.");
684 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
685 }
686 }
687 else if (qosValue == DELIVER_AT_LEAST_ONCE)
688 {
689 pubRel = mqtt_codec_publishAck(packetId);
690 if (pubRel == NULL)
691 {
692 LogError("Failed to allocate publish ack message.");
693 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
694 }
695 }
696 if (pubRel != NULL)
697 {
698 size_t size = BUFFER_length(pubRel);
699 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size);
700 BUFFER_delete(pubRel);
701 }
702 }
703 mqttmessage_destroy(msgHandle);
704 }
705
706 if (trace_log != NULL)
707 {
708 STRING_delete(trace_log);
709 }
710
711 free(topicName);
712 }
713}
714
715static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
716{
717 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
718 if (mqtt_client != NULL)
719 {
720 size_t packetLength = 0;
721 uint8_t* iterator = NULL;
722 if (headerData != NULL)
723 {
724 packetLength = BUFFER_length(headerData);
725 iterator = BUFFER_u_char(headerData);
726 }
727
728#ifdef ENABLE_RAW_TRACE
729 logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, packetLength);
730#endif
731 if ((iterator != NULL && packetLength > 0) || packet == PINGRESP_TYPE)
732 {
733 switch (packet)
734 {
735 case CONNACK_TYPE:
736 {
737 /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
738 CONNECT_ACK connack = { 0 };
739 connack.isSessionPresent = (byteutil_readByte(&iterator) == 0x1) ? true : false;
740 uint8_t rc = byteutil_readByte(&iterator);
741 connack.returnCode =
742 (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
743 (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;
744
745#ifndef NO_LOGGING
746 if (mqtt_client->logTrace)
747 {
748 STRING_HANDLE trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
749 log_incoming_trace(mqtt_client, trace_log);
750 STRING_delete(trace_log);
751 }
752#endif
753 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);
754
755 if (connack.returnCode == CONNECTION_ACCEPTED)
756 {
757 mqtt_client->clientConnected = true;
758 }
759 break;
760 }
761 case PUBLISH_TYPE:
762 {
763 ProcessPublishMessage(mqtt_client, iterator, packetLength, flags);
764 break;
765 }
766 case PUBACK_TYPE:
767 case PUBREC_TYPE:
768 case PUBREL_TYPE:
769 case PUBCOMP_TYPE:
770 {
771 /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
772 MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
773 (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
774 (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
775
776 PUBLISH_ACK publish_ack = { 0 };
777 publish_ack.packetId = byteutil_read_uint16(&iterator, packetLength);
778
779#ifndef NO_LOGGING
780 if (mqtt_client->logTrace)
781 {
782 STRING_HANDLE trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
783 publish_ack.packetId);
784
785 log_incoming_trace(mqtt_client, trace_log);
786 STRING_delete(trace_log);
787 }
788#endif
789 BUFFER_HANDLE pubRel = NULL;
790 mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
791 if (packet == PUBREC_TYPE)
792 {
793 pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
794 if (pubRel == NULL)
795 {
796 LogError("Failed to allocate publish release message.");
797 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
798 }
799 }
800 else if (packet == PUBREL_TYPE)
801 {
802 pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
803 if (pubRel == NULL)
804 {
805 LogError("Failed to allocate publish complete message.");
806 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
807 }
808 }
809 if (pubRel != NULL)
810 {
811 size_t size = BUFFER_length(pubRel);
812 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size);
813 BUFFER_delete(pubRel);
814 }
815 break;
816 }
817 case SUBACK_TYPE:
818 {
819
820 /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
821 SUBSCRIBE_ACK suback = { 0 };
822
823 size_t remainLen = packetLength;
824 suback.packetId = byteutil_read_uint16(&iterator, packetLength);
825 remainLen -= 2;
826
827#ifndef NO_LOGGING
828 STRING_HANDLE trace_log = NULL;
829 if (mqtt_client->logTrace)
830 {
831 trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
832 }
833#endif
834 // Allocate the remaining len
835 suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
836 if (suback.qosReturn != NULL)
837 {
838 while (remainLen > 0)
839 {
840 uint8_t qosRet = byteutil_readByte(&iterator);
841 suback.qosReturn[suback.qosCount++] =
842 (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
843 (QOS_VALUE)qosRet : DELIVER_FAILURE;
844 remainLen--;
845#ifndef NO_LOGGING
846 if (mqtt_client->logTrace)
847 {
848 STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
849 }
850#endif
851 }
852
853#ifndef NO_LOGGING
854 if (mqtt_client->logTrace)
855 {
856 log_incoming_trace(mqtt_client, trace_log);
857 STRING_delete(trace_log);
858 }
859#endif
860 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
861 free(suback.qosReturn);
862 }
863 else
864 {
865 LogError("allocation of quality of service value failed.");
866 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
867 }
868 break;
869 }
870 case UNSUBACK_TYPE:
871 {
872 /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
873 UNSUBSCRIBE_ACK unsuback = { 0 };
874 unsuback.packetId = byteutil_read_uint16(&iterator, packetLength);
875
876#ifndef NO_LOGGING
877 if (mqtt_client->logTrace)
878 {
879 STRING_HANDLE trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
880 log_incoming_trace(mqtt_client, trace_log);
881 STRING_delete(trace_log);
882 }
883#endif
884 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
885 break;
886 }
887 case PINGRESP_TYPE:
888 mqtt_client->timeSincePing = 0;
889#ifndef NO_LOGGING
890 if (mqtt_client->logTrace)
891 {
892 STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
893 log_incoming_trace(mqtt_client, trace_log);
894 STRING_delete(trace_log);
895 }
896#endif
897 // Forward ping response to operation callback
898 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
899 break;
900 default:
901 break;
902 }
903 }
904 }
905 else
906 {
907 LogError("recvCompleteCallback context failed.");
908 }
909}
910
911MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK operation_cb, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx)
912{
913 MQTT_CLIENT* result;
914 /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
915 if (msgRecv == NULL || operation_cb == NULL)
916 {
917 LogError("Invalid parameter specified msgRecv: %p, operation_cb: %p", msgRecv, operation_cb);
918 result = NULL;
919 }
920 else
921 {
922 result = malloc(sizeof(MQTT_CLIENT));
923 if (result == NULL)
924 {
925 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
926 LogError("mqtt_client_init failure: Allocation Failure");
927 }
928 else
929 {
930 memset(result, 0, sizeof(MQTT_CLIENT));
931 /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
932 result->packetState = UNKNOWN_TYPE;
933 result->fnOperationCallback = operation_cb;
934 result->ctx = opCallbackCtx;
935 result->fnMessageRecv = msgRecv;
936 result->fnOnErrorCallBack = onErrorCallBack;
937 result->errorCBCtx = errorCBCtx;
938 result->qosValue = DELIVER_AT_MOST_ONCE;
939 result->packetTickCntr = tickcounter_create();
940 result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
941 if (result->packetTickCntr == NULL)
942 {
943 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
944 LogError("mqtt_client_init failure: tickcounter_create failure");
945 free(result);
946 result = NULL;
947 }
948 else
949 {
950 result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
951 if (result->codec_handle == NULL)
952 {
953 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
954 LogError("mqtt_client_init failure: mqtt_codec_create failure");
955 tickcounter_destroy(result->packetTickCntr);
956 free(result);
957 result = NULL;
958 }
959 }
960 }
961 }
962 return result;
963}
964
965void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
966{
967 /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
968 if (handle != NULL)
969 {
970 /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
971 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
972 tickcounter_destroy(mqtt_client->packetTickCntr);
973 mqtt_codec_destroy(mqtt_client->codec_handle);
974 clear_mqtt_options(mqtt_client);
975 free(mqtt_client);
976 }
977}
978
979int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
980{
981 int result;
982 /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
983 if (handle == NULL || mqttOptions == NULL || xioHandle == NULL)
984 {
985 LogError("mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p, xioHandle: %p)", handle, mqttOptions, xioHandle);
986 result = MU_FAILURE;
987 }
988 else
989 {
990 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
991 mqtt_client->xioHandle = xioHandle;
992 mqtt_client->packetState = UNKNOWN_TYPE;
993 mqtt_client->qosValue = mqttOptions->qualityOfServiceValue;
994 mqtt_client->keepAliveInterval = mqttOptions->keepAliveInterval;
995 mqtt_client->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
996 if (cloneMqttOptions(mqtt_client, mqttOptions) != 0)
997 {
998 LogError("Error: Clone Mqtt Options failed");
999 result = MU_FAILURE;
1000 }
1001 /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
1002 else if (xio_open(xioHandle, onOpenComplete, mqtt_client, onBytesReceived, mqtt_client, onIoError, mqtt_client) != 0)
1003 {
1004 /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
1005 LogError("Error: io_open failed");
1006 result = MU_FAILURE;
1007 mqtt_client->xioHandle = NULL;
1008 // Remove cloned options
1009 clear_mqtt_options(mqtt_client);
1010 }
1011 else
1012 {
1013 result = 0;
1014 }
1015 }
1016 return result;
1017}
1018
1019int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
1020{
1021 int result;
1022 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1023 if (mqtt_client == NULL || msgHandle == NULL)
1024 {
1025 /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
1026 LogError("Invalid parameter specified mqtt_client: %p, msgHandle: %p", mqtt_client, msgHandle);
1027 result = MU_FAILURE;
1028 }
1029 else
1030 {
1031 /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
1032 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1033 if (payload == NULL)
1034 {
1035 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1036 LogError("Error: mqttmessage_getApplicationMsg failed");
1037 result = MU_FAILURE;
1038 }
1039 else
1040 {
1041 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1042
1043 QOS_VALUE qos = mqttmessage_getQosType(msgHandle);
1044 bool isDuplicate = mqttmessage_getIsDuplicateMsg(msgHandle);
1045 bool isRetained = mqttmessage_getIsRetained(msgHandle);
1046 uint16_t packetId = mqttmessage_getPacketId(msgHandle);
1047 const char* topicName = mqttmessage_getTopicName(msgHandle);
1048 BUFFER_HANDLE publishPacket = mqtt_codec_publish(qos, isDuplicate, isRetained, packetId, topicName, payload->message, payload->length, trace_log);
1049 if (publishPacket == NULL)
1050 {
1051 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1052 LogError("Error: mqtt_codec_publish failed");
1053 result = MU_FAILURE;
1054 }
1055 else
1056 {
1057 mqtt_client->packetState = PUBLISH_TYPE;
1058
1059 /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1060 size_t size = BUFFER_length(publishPacket);
1061 if (sendPacketItem(mqtt_client, BUFFER_u_char(publishPacket), size) != 0)
1062 {
1063 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1064 LogError("Error: mqtt_client_publish send failed");
1065 result = MU_FAILURE;
1066 }
1067 else
1068 {
1069 log_outgoing_trace(mqtt_client, trace_log);
1070 result = 0;
1071 }
1072 BUFFER_delete(publishPacket);
1073 }
1074 if (trace_log != NULL)
1075 {
1076 STRING_delete(trace_log);
1077 }
1078 }
1079 }
1080 return result;
1081}
1082
1083int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
1084{
1085 int result;
1086 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1087 if (mqtt_client == NULL || subscribeList == NULL || count == 0 || packetId == 0)
1088 {
1089 /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
1090 LogError("Invalid parameter specified mqtt_client: %p, subscribeList: %p, count: %lu, packetId: %d", mqtt_client, subscribeList, (unsigned long)count, packetId);
1091 result = MU_FAILURE;
1092 }
1093 else
1094 {
1095 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1096
1097 BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count, trace_log);
1098 if (subPacket == NULL)
1099 {
1100 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1101 LogError("Error: mqtt_codec_subscribe failed");
1102 result = MU_FAILURE;
1103 }
1104 else
1105 {
1106 mqtt_client->packetState = SUBSCRIBE_TYPE;
1107
1108 size_t size = BUFFER_length(subPacket);
1109 /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1110 if (sendPacketItem(mqtt_client, BUFFER_u_char(subPacket), size) != 0)
1111 {
1112 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1113 LogError("Error: mqtt_client_subscribe send failed");
1114 result = MU_FAILURE;
1115 }
1116 else
1117 {
1118 log_outgoing_trace(mqtt_client, trace_log);
1119 result = 0;
1120 }
1121 BUFFER_delete(subPacket);
1122 }
1123 if (trace_log != NULL)
1124 {
1125 STRING_delete(trace_log);
1126 }
1127 }
1128 return result;
1129}
1130
1131int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
1132{
1133 int result;
1134 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1135 if (mqtt_client == NULL || unsubscribeList == NULL || count == 0 || packetId == 0)
1136 {
1137 /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
1138 LogError("Invalid parameter specified mqtt_client: %p, unsubscribeList: %p, count: %lu, packetId: %d", mqtt_client, unsubscribeList, (unsigned long)count, packetId);
1139 result = MU_FAILURE;
1140 }
1141 else
1142 {
1143 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1144
1145 BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count, trace_log);
1146 if (unsubPacket == NULL)
1147 {
1148 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1149 LogError("Error: mqtt_codec_unsubscribe failed");
1150 result = MU_FAILURE;
1151 }
1152 else
1153 {
1154 mqtt_client->packetState = UNSUBSCRIBE_TYPE;
1155
1156 size_t size = BUFFER_length(unsubPacket);
1157 /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1158 if (sendPacketItem(mqtt_client, BUFFER_u_char(unsubPacket), size) != 0)
1159 {
1160 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
1161 LogError("Error: mqtt_client_unsubscribe send failed");
1162 result = MU_FAILURE;
1163 }
1164 else
1165 {
1166 log_outgoing_trace(mqtt_client, trace_log);
1167 result = 0;
1168 }
1169 BUFFER_delete(unsubPacket);
1170 }
1171 if (trace_log != NULL)
1172 {
1173 STRING_delete(trace_log);
1174 }
1175 }
1176 return result;
1177}
1178
1179int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle, ON_MQTT_DISCONNECTED_CALLBACK callback, void* ctx)
1180{
1181 int result;
1182 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1183 if (mqtt_client == NULL)
1184 {
1185 /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
1186 result = MU_FAILURE;
1187 }
1188 else
1189 {
1190 if (mqtt_client->clientConnected)
1191 {
1192 BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
1193 if (disconnectPacket == NULL)
1194 {
1195 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1196 LogError("Error: mqtt_client_disconnect failed");
1197 mqtt_client->packetState = PACKET_TYPE_ERROR;
1198 result = MU_FAILURE;
1199 }
1200 else
1201 {
1202 /* Codes_SRS_MQTT_CLIENT_07_037: [ if callback is not NULL callback shall be called once the mqtt connection has been disconnected ] */
1203 mqtt_client->disconnect_cb = callback;
1204 mqtt_client->disconnect_ctx = ctx;
1205 mqtt_client->packetState = DISCONNECT_TYPE;
1206
1207 size_t size = BUFFER_length(disconnectPacket);
1208 /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
1209 if (sendPacketItem(mqtt_client, BUFFER_u_char(disconnectPacket), size) != 0)
1210 {
1211 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1212 LogError("Error: mqtt_client_disconnect send failed");
1213 result = MU_FAILURE;
1214 }
1215 else
1216 {
1217 if (mqtt_client->logTrace)
1218 {
1219 STRING_HANDLE trace_log = STRING_construct("DISCONNECT");
1220 log_outgoing_trace(mqtt_client, trace_log);
1221 STRING_delete(trace_log);
1222 }
1223 result = 0;
1224 }
1225 BUFFER_delete(disconnectPacket);
1226 }
1227 clear_mqtt_options(mqtt_client);
1228 }
1229 else
1230 {
1231 // If the client is not connected then just close the underlying socket
1232 mqtt_client->disconnect_cb = callback;
1233 mqtt_client->disconnect_ctx = ctx;
1234
1235 close_connection(mqtt_client);
1236 clear_mqtt_options(mqtt_client);
1237 result = 0;
1238 }
1239 }
1240 return result;
1241}
1242
1243void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
1244{
1245 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1246 /*Codes_SRS_MQTT_CLIENT_18_001: [If the client is disconnected, mqtt_client_dowork shall do nothing.]*/
1247 /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
1248 if (mqtt_client != NULL && mqtt_client->xioHandle != NULL)
1249 {
1250 /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
1251 xio_dowork(mqtt_client->xioHandle);
1252
1253 /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
1254 if (mqtt_client->socketConnected && mqtt_client->clientConnected && mqtt_client->keepAliveInterval > 0)
1255 {
1256 tickcounter_ms_t current_ms;
1257 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &current_ms) != 0)
1258 {
1259 LogError("Error: tickcounter_get_current_ms failed");
1260 }
1261 else
1262 {
1263 /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
1264 if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime)
1265 {
1266 // We haven't gotten a ping response in the alloted time
1267 set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE);
1268 mqtt_client->timeSincePing = 0;
1269 mqtt_client->packetSendTimeMs = 0;
1270 mqtt_client->packetState = UNKNOWN_TYPE;
1271 }
1272 else if (((current_ms - mqtt_client->packetSendTimeMs) / 1000) >= mqtt_client->keepAliveInterval)
1273 {
1274 /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
1275 BUFFER_HANDLE pingPacket = mqtt_codec_ping();
1276 if (pingPacket != NULL)
1277 {
1278 size_t size = BUFFER_length(pingPacket);
1279 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pingPacket), size);
1280 BUFFER_delete(pingPacket);
1281 (void)tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->timeSincePing);
1282
1283 if (mqtt_client->logTrace)
1284 {
1285 STRING_HANDLE trace_log = STRING_construct("PINGREQ");
1286 log_outgoing_trace(mqtt_client, trace_log);
1287 STRING_delete(trace_log);
1288 }
1289 }
1290 }
1291 }
1292 }
1293 }
1294}
1295
1296void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
1297{
1298 AZURE_UNREFERENCED_PARAMETER(handle);
1299 AZURE_UNREFERENCED_PARAMETER(traceOn);
1300 AZURE_UNREFERENCED_PARAMETER(rawBytesOn);
1301#ifndef NO_LOGGING
1302 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1303 if (mqtt_client != NULL)
1304 {
1305 mqtt_client->logTrace = traceOn;
1306#ifdef ENABLE_RAW_TRACE
1307 mqtt_client->rawBytesTrace = rawBytesOn;
1308#endif
1309 }
1310#endif
1311}
Note: See TracBrowser for help on using the repository browser.