source: azure_iot_hub_f767zi/trunk/azure_iot_sdk/umqtt/src/mqtt_client.c@ 464

Last change on this file since 464 was 464, checked in by coas-nagasima, 3 years ago

WolfSSLとAzure IoT SDKを更新

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 53.9 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <stdbool.h>
6#include "azure_c_shared_utility/agenttime.h"
7#include "azure_c_shared_utility/const_defines.h"
8#include "azure_c_shared_utility/crt_abstractions.h"
9#include "azure_c_shared_utility/gballoc.h"
10#include "azure_c_shared_utility/optimize_size.h"
11#include "azure_macro_utils/macro_utils.h"
12#include "azure_c_shared_utility/platform.h"
13#include "azure_c_shared_utility/strings.h"
14#include "azure_c_shared_utility/threadapi.h"
15#include "azure_c_shared_utility/tickcounter.h"
16#include "azure_c_shared_utility/xlogging.h"
17
18#include "azure_umqtt_c/mqtt_client.h"
19#include "azure_umqtt_c/mqtt_codec.h"
20#include <inttypes.h>
21
22#define VARIABLE_HEADER_OFFSET 2
23#define RETAIN_FLAG_MASK 0x1
24#define QOS_LEAST_ONCE_FLAG_MASK 0x2
25#define QOS_EXACTLY_ONCE_FLAG_MASK 0x4
26#define DUPLICATE_FLAG_MASK 0x8
27#define CONNECT_PACKET_MASK 0xf0
28#define TIME_MAX_BUFFER 16
29#define DEFAULT_MAX_PING_RESPONSE_TIME 80 // % of time to send pings
30#define MAX_CLOSE_RETRIES 20
31
32static const char* const TRUE_CONST = "true";
33static const char* const FALSE_CONST = "false";
34
35MU_DEFINE_ENUM_STRINGS(QOS_VALUE, QOS_VALUE_VALUES);
36
37#define MQTT_STATUS_INITIAL_STATUS 0x0000
38#define MQTT_STATUS_CLIENT_CONNECTED 0x0001
39#define MQTT_STATUS_SOCKET_CONNECTED 0x0002
40#define MQTT_STATUS_PENDING_CLOSE 0x0004
41
42#define MQTT_FLAGS_LOG_TRACE 0x0001
43#define MQTT_FLAGS_RAW_TRACE 0x0002
44
45typedef struct MQTT_CLIENT_TAG
46{
47 XIO_HANDLE xioHandle;
48 MQTTCODEC_HANDLE codec_handle;
49 CONTROL_PACKET_TYPE packetState;
50 TICK_COUNTER_HANDLE packetTickCntr;
51 tickcounter_ms_t packetSendTimeMs;
52 ON_MQTT_OPERATION_CALLBACK fnOperationCallback;
53 ON_MQTT_MESSAGE_RECV_CALLBACK fnMessageRecv;
54 void* ctx;
55 ON_MQTT_ERROR_CALLBACK fnOnErrorCallBack;
56 void* errorCBCtx;
57 ON_MQTT_DISCONNECTED_CALLBACK disconnect_cb;
58 void* disconnect_ctx;
59 QOS_VALUE qosValue;
60 uint16_t keepAliveInterval;
61 MQTT_CLIENT_OPTIONS mqttOptions;
62
63 uint16_t mqtt_status;
64 uint16_t mqtt_flags;
65
66 tickcounter_ms_t timeSincePing;
67 uint16_t maxPingRespTime;
68} MQTT_CLIENT;
69
70static bool is_trace_enabled(MQTT_CLIENT* mqtt_client)
71{
72 return (mqtt_client->mqtt_flags & MQTT_FLAGS_LOG_TRACE);
73}
74
75#ifdef ENABLE_RAW_TRACE
76static bool is_raw_trace_enabled(MQTT_CLIENT* mqtt_client)
77{
78 return (mqtt_client->mqtt_flags & MQTT_FLAGS_RAW_TRACE);
79}
80#endif // ENABLE_RAW_TRACE
81
82static void on_connection_closed(void* context)
83{
84 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
85 if (mqtt_client != NULL)
86 {
87 mqtt_client->mqtt_status &= ~MQTT_STATUS_SOCKET_CONNECTED;
88 if (mqtt_client->disconnect_cb)
89 {
90 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
91 }
92 }
93}
94
95static void close_connection(MQTT_CLIENT* mqtt_client)
96{
97 if (mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED)
98 {
99 (void)xio_close(mqtt_client->xioHandle, on_connection_closed, mqtt_client);
100 if (mqtt_client->disconnect_cb == NULL)
101 {
102 size_t counter = 0;
103 do
104 {
105 xio_dowork(mqtt_client->xioHandle);
106 counter++;
107 ThreadAPI_Sleep(2);
108 } while (mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED && counter < MAX_CLOSE_RETRIES);
109 }
110 // Clear the handle because we don't use it anymore
111 mqtt_client->xioHandle = NULL;
112 }
113 else
114 {
115 mqtt_client->mqtt_status &= ~MQTT_STATUS_SOCKET_CONNECTED;
116 if (mqtt_client->disconnect_cb)
117 {
118 mqtt_client->disconnect_cb(mqtt_client->disconnect_ctx);
119 }
120 }
121}
122
123static void set_error_callback(MQTT_CLIENT* mqtt_client, MQTT_CLIENT_EVENT_ERROR error_type)
124{
125 if (mqtt_client->fnOnErrorCallBack)
126 {
127 mqtt_client->fnOnErrorCallBack(mqtt_client, error_type, mqtt_client->errorCBCtx);
128 }
129 close_connection(mqtt_client);
130}
131
132static STRING_HANDLE construct_trace_log_handle(MQTT_CLIENT* mqtt_client)
133{
134 STRING_HANDLE trace_log;
135 if (is_trace_enabled(mqtt_client) )
136 {
137 trace_log = STRING_new();
138 }
139 else
140 {
141 trace_log = NULL;
142 }
143 return trace_log;
144}
145
146static uint16_t byteutil_read_uint16(uint8_t** buffer, size_t byteLen)
147{
148 uint16_t result = 0;
149 if (buffer != NULL && *buffer != NULL && byteLen >= 2)
150 {
151 result = 256 * (**buffer) + (*(*buffer + 1));
152 *buffer += 2; // Move the ptr
153 }
154 else
155 {
156 LogError("byteutil_read_uint16 == NULL or less than 2");
157 }
158 return result;
159}
160
161static char* byteutil_readUTF(uint8_t** buffer, size_t* byteLen)
162{
163 char* result = NULL;
164
165 const uint8_t* bufferInitial = *buffer;
166 // Get the length of the string
167 uint16_t stringLen = byteutil_read_uint16(buffer, *byteLen);
168 // Verify that byteutil_read_uint16 succeeded (by stringLen>0) and that we're
169 // not being asked to read a string longer than buffer passed in.
170 if ((stringLen > 0) && ((size_t)(stringLen + (*buffer - bufferInitial)) <= *byteLen))
171 {
172 result = (char*)malloc(stringLen + 1);
173 if (result != NULL)
174 {
175 (void)memcpy(result, *buffer, stringLen);
176 result[stringLen] = '\0';
177 *buffer += stringLen;
178 *byteLen = stringLen;
179 }
180 }
181 else
182 {
183 LogError("String passed not a valid UTF.");
184 }
185
186 return result;
187}
188
189static uint8_t byteutil_readByte(uint8_t** buffer)
190{
191 uint8_t result = 0;
192 if (buffer != NULL)
193 {
194 result = **buffer;
195 (*buffer)++;
196 }
197 else
198 {
199 LogError("readByte buffer == NULL.");
200 }
201 return result;
202}
203
204static void sendComplete(void* context, IO_SEND_RESULT send_result)
205{
206 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
207 if (mqtt_client != NULL)
208 {
209 if (send_result == IO_SEND_OK)
210 {
211 if (mqtt_client->packetState == DISCONNECT_TYPE)
212 {
213 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
214 if (mqtt_client->fnOperationCallback != NULL)
215 {
216 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_DISCONNECT, NULL, mqtt_client->ctx);
217 }
218
219 // Mark to close
220 mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
221 }
222 }
223 else if (send_result == IO_SEND_ERROR)
224 {
225 LogError("MQTT Send Complete Failure send_result: %d", (int)send_result);
226 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
227 }
228 }
229 else
230 {
231 LogError("MQTT Send Complete Failure with NULL mqtt_client");
232 }
233}
234
235#ifndef NO_LOGGING
236static void getLogTime(char* timeResult, size_t len)
237{
238 if (timeResult != NULL)
239 {
240 time_t agent_time = get_time(NULL);
241 if (agent_time == (time_t)-1)
242 {
243 timeResult[0] = '\0';
244 }
245 else
246 {
247 struct tm* tmInfo = localtime(&agent_time);
248 if (tmInfo == NULL)
249 {
250 timeResult[0] = '\0';
251 }
252 else
253 {
254 if (strftime(timeResult, len, "%H:%M:%S", tmInfo) == 0)
255 {
256 timeResult[0] = '\0';
257 }
258 }
259 }
260 }
261}
262
263#ifdef ENABLE_RAW_TRACE
264static const char* retrievePacketType(CONTROL_PACKET_TYPE packet)
265{
266 switch (packet&CONNECT_PACKET_MASK)
267 {
268 case CONNECT_TYPE: return "CONNECT";
269 case CONNACK_TYPE: return "CONNACK";
270 case PUBLISH_TYPE: return "PUBLISH";
271 case PUBACK_TYPE: return "PUBACK";
272 case PUBREC_TYPE: return "PUBREC";
273 case PUBREL_TYPE: return "PUBREL";
274 case SUBSCRIBE_TYPE: return "SUBSCRIBE";
275 case SUBACK_TYPE: return "SUBACK";
276 case UNSUBSCRIBE_TYPE: return "UNSUBSCRIBE";
277 case UNSUBACK_TYPE: return "UNSUBACK";
278 case PINGREQ_TYPE: return "PINGREQ";
279 case PINGRESP_TYPE: return "PINGRESP";
280 case DISCONNECT_TYPE: return "DISCONNECT";
281 default:
282 case PACKET_TYPE_ERROR:
283 case UNKNOWN_TYPE:
284 return "UNKNOWN";
285 }
286}
287
288static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
289{
290 if (mqtt_client != NULL && data != NULL && length > 0 && is_raw_trace_enabled(mqtt_client))
291 {
292 char tmBuffer[TIME_MAX_BUFFER];
293 getLogTime(tmBuffer, TIME_MAX_BUFFER);
294
295 LOG(AZ_LOG_TRACE, 0, "-> %s %s: ", tmBuffer, retrievePacketType((unsigned char)data[0]));
296 size_t index = 0;
297 for (index = 0; index < length; index++)
298 {
299 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
300 }
301 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
302 }
303}
304
305static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
306{
307 if (mqtt_client != NULL && is_raw_trace_enabled(mqtt_client))
308 {
309 if (data != NULL && length > 0)
310 {
311 char tmBuffer[TIME_MAX_BUFFER];
312 getLogTime(tmBuffer, TIME_MAX_BUFFER);
313
314 LOG(AZ_LOG_TRACE, 0, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
315 size_t index = 0;
316 for (index = 0; index < length; index++)
317 {
318 LOG(AZ_LOG_TRACE, 0, "0x%02x ", data[index]);
319 }
320 LOG(AZ_LOG_TRACE, LOG_LINE, " ");
321 }
322 else if (packet == PINGRESP_TYPE)
323 {
324 char tmBuffer[TIME_MAX_BUFFER];
325 getLogTime(tmBuffer, TIME_MAX_BUFFER);
326 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s: 0x%02x 0x%02x ", tmBuffer, retrievePacketType((CONTROL_PACKET_TYPE)packet), (unsigned int)(packet | flags), (unsigned int)length);
327 }
328 }
329}
330#endif // ENABLE_RAW_TRACE
331
332static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
333{
334 if (mqtt_client != NULL && is_trace_enabled(mqtt_client) && trace_log != NULL)
335 {
336 char tmBuffer[TIME_MAX_BUFFER];
337 getLogTime(tmBuffer, TIME_MAX_BUFFER);
338 LOG(AZ_LOG_TRACE, LOG_LINE, "-> %s %s", tmBuffer, STRING_c_str(trace_log));
339 }
340}
341
342static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
343{
344 if (mqtt_client != NULL && is_trace_enabled(mqtt_client) && trace_log != NULL)
345 {
346 char tmBuffer[TIME_MAX_BUFFER];
347 getLogTime(tmBuffer, TIME_MAX_BUFFER);
348 LOG(AZ_LOG_TRACE, LOG_LINE, "<- %s %s", tmBuffer, STRING_c_str(trace_log) );
349 }
350}
351#else // NO_LOGGING
352static void logOutgoingRawTrace(MQTT_CLIENT* mqtt_client, const uint8_t* data, size_t length)
353{
354 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
355 AZURE_UNREFERENCED_PARAMETER(data);
356 AZURE_UNREFERENCED_PARAMETER(length);
357}
358
359static void log_outgoing_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
360{
361 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
362 AZURE_UNREFERENCED_PARAMETER(trace_log);
363}
364
365static void log_incoming_trace(MQTT_CLIENT* mqtt_client, STRING_HANDLE trace_log)
366{
367 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
368 AZURE_UNREFERENCED_PARAMETER(trace_log);
369}
370
371static void logIncomingRawTrace(MQTT_CLIENT* mqtt_client, CONTROL_PACKET_TYPE packet, uint8_t flags, const uint8_t* data, size_t length)
372{
373 AZURE_UNREFERENCED_PARAMETER(mqtt_client);
374 AZURE_UNREFERENCED_PARAMETER(packet);
375 AZURE_UNREFERENCED_PARAMETER(flags);
376 AZURE_UNREFERENCED_PARAMETER(data);
377 AZURE_UNREFERENCED_PARAMETER(length);
378}
379#endif // NO_LOGGING
380
381static int sendPacketItem(MQTT_CLIENT* mqtt_client, const unsigned char* data, size_t length)
382{
383 int result;
384
385 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->packetSendTimeMs) != 0)
386 {
387 LogError("Failure getting current ms tickcounter");
388 result = MU_FAILURE;
389 }
390 else
391 {
392 result = xio_send(mqtt_client->xioHandle, (const void*)data, length, sendComplete, mqtt_client);
393 if (result != 0)
394 {
395 LogError("Failure sending control packet data");
396 result = MU_FAILURE;
397 }
398 else
399 {
400#ifdef ENABLE_RAW_TRACE
401 logOutgoingRawTrace(mqtt_client, (const uint8_t*)data, length);
402#endif
403 }
404 }
405 return result;
406}
407
408static void onOpenComplete(void* context, IO_OPEN_RESULT open_result)
409{
410 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
411 if (mqtt_client != NULL)
412 {
413 if (open_result == IO_OPEN_OK && !(mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED))
414 {
415 mqtt_client->packetState = CONNECT_TYPE;
416 mqtt_client->mqtt_status |= MQTT_STATUS_SOCKET_CONNECTED;
417
418 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
419
420 // Send the Connect packet
421 BUFFER_HANDLE connPacket = mqtt_codec_connect(&mqtt_client->mqttOptions, trace_log);
422 if (connPacket == NULL)
423 {
424 LogError("Error: mqtt_codec_connect failed");
425 }
426 else
427 {
428 size_t size = BUFFER_length(connPacket);
429 /*Codes_SRS_MQTT_CLIENT_07_009: [On success mqtt_client_connect shall send the MQTT CONNECT to the endpoint.]*/
430 if (sendPacketItem(mqtt_client, BUFFER_u_char(connPacket), size) != 0)
431 {
432 LogError("Error: failure sending connect packet");
433 // Set the status to pending close because we connot continue
434 // with CONN failing to send
435 if (mqtt_client->fnOnErrorCallBack)
436 {
437 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
438 }
439 mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
440 }
441 else
442 {
443 log_outgoing_trace(mqtt_client, trace_log);
444 }
445 BUFFER_delete(connPacket);
446 }
447 if (trace_log != NULL)
448 {
449 STRING_delete(trace_log);
450 }
451 }
452 else
453 {
454 LogError("Error: failure opening connection to endpoint");
455 if (!(mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED) && mqtt_client->fnOnErrorCallBack)
456 {
457 mqtt_client->fnOnErrorCallBack(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR, mqtt_client->errorCBCtx);
458 }
459 mqtt_client->mqtt_status |= MQTT_STATUS_PENDING_CLOSE;
460 }
461 }
462 else
463 {
464 LogError("Error: mqtt_client is NULL");
465 }
466}
467
468static void onBytesReceived(void* context, const unsigned char* buffer, size_t size)
469{
470 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
471 if (mqtt_client != NULL)
472 {
473 if (mqtt_codec_bytesReceived(mqtt_client->codec_handle, buffer, size) != 0)
474 {
475 mqtt_codec_reset(mqtt_client->codec_handle);
476 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
477 }
478 }
479 else
480 {
481 LogError("Error: mqtt_client is NULL");
482 }
483}
484
485static void onIoError(void* context)
486{
487 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
488 if (mqtt_client != NULL && mqtt_client->fnOperationCallback)
489 {
490 /*Codes_SRS_MQTT_CLIENT_07_032: [If the actionResult parameter is of type MQTT_CLIENT_ON_DISCONNECT the the msgInfo value shall be NULL.]*/
491 /* Codes_SRS_MQTT_CLIENT_07_036: [ If an error is encountered by the ioHandle the mqtt_client shall call xio_close. ] */
492 set_error_callback(mqtt_client, MQTT_CLIENT_CONNECTION_ERROR);
493 }
494 else
495 {
496 LogError("Error invalid parameter: mqtt_client: %p", mqtt_client);
497 }
498}
499
500static void clear_mqtt_options(MQTT_CLIENT* mqtt_client)
501{
502 if (mqtt_client->mqttOptions.clientId != NULL)
503 {
504 free(mqtt_client->mqttOptions.clientId);
505 mqtt_client->mqttOptions.clientId = NULL;
506 }
507
508 if (mqtt_client->mqttOptions.willTopic != NULL)
509 {
510 free(mqtt_client->mqttOptions.willTopic);
511 mqtt_client->mqttOptions.willTopic = NULL;
512 }
513
514 if (mqtt_client->mqttOptions.willMessage != NULL)
515 {
516 free(mqtt_client->mqttOptions.willMessage);
517 mqtt_client->mqttOptions.willMessage = NULL;
518 }
519
520 if (mqtt_client->mqttOptions.username != NULL)
521 {
522 free(mqtt_client->mqttOptions.username);
523 mqtt_client->mqttOptions.username = NULL;
524 }
525
526 if (mqtt_client->mqttOptions.password != NULL)
527 {
528 free(mqtt_client->mqttOptions.password);
529 mqtt_client->mqttOptions.password = NULL;
530 }
531}
532
533static int cloneMqttOptions(MQTT_CLIENT* mqtt_client, const MQTT_CLIENT_OPTIONS* mqttOptions)
534{
535 int result = 0;
536 char* temp_option;
537
538 if (mqttOptions->clientId != NULL)
539 {
540 if (mallocAndStrcpy_s(&temp_option, mqttOptions->clientId) != 0)
541 {
542 result = MU_FAILURE;
543 LogError("mallocAndStrcpy_s clientId");
544 }
545 else
546 {
547 if (mqtt_client->mqttOptions.clientId != NULL)
548 {
549 free(mqtt_client->mqttOptions.clientId);
550 }
551 mqtt_client->mqttOptions.clientId = temp_option;
552 }
553 }
554 if (result == 0 && mqttOptions->willTopic != NULL)
555 {
556 temp_option = NULL;
557 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willTopic) != 0)
558 {
559 result = MU_FAILURE;
560 LogError("mallocAndStrcpy_s willTopic");
561 }
562 else
563 {
564 if (mqtt_client->mqttOptions.willTopic != NULL)
565 {
566 free(mqtt_client->mqttOptions.willTopic);
567 }
568 mqtt_client->mqttOptions.willTopic = temp_option;
569 }
570 }
571 if (result == 0 && mqttOptions->willMessage != NULL)
572 {
573 temp_option = NULL;
574 if (mallocAndStrcpy_s(&temp_option, mqttOptions->willMessage) != 0)
575 {
576 LogError("mallocAndStrcpy_s willMessage");
577 result = MU_FAILURE;
578 }
579 else
580 {
581 if (mqtt_client->mqttOptions.willMessage != NULL)
582 {
583 free(mqtt_client->mqttOptions.willMessage);
584 }
585 mqtt_client->mqttOptions.willMessage = temp_option;
586 }
587 }
588 if (result == 0 && mqttOptions->username != NULL)
589 {
590 temp_option = NULL;
591 if (mallocAndStrcpy_s(&temp_option, mqttOptions->username) != 0)
592 {
593 LogError("mallocAndStrcpy_s username");
594 result = MU_FAILURE;
595 }
596 else
597 {
598 if (mqtt_client->mqttOptions.username != NULL)
599 {
600 free(mqtt_client->mqttOptions.username);
601 }
602 mqtt_client->mqttOptions.username = temp_option;
603 }
604 }
605 if (result == 0 && mqttOptions->password != NULL)
606 {
607 temp_option = NULL;
608 if (mallocAndStrcpy_s(&temp_option, mqttOptions->password) != 0)
609 {
610 LogError("mallocAndStrcpy_s password");
611 result = MU_FAILURE;
612 }
613 else
614 {
615 if (mqtt_client->mqttOptions.password != NULL)
616 {
617 free(mqtt_client->mqttOptions.password);
618 }
619 mqtt_client->mqttOptions.password = temp_option;
620 }
621 }
622 if (result == 0)
623 {
624 mqtt_client->mqttOptions.keepAliveInterval = mqttOptions->keepAliveInterval;
625 mqtt_client->mqttOptions.messageRetain = mqttOptions->messageRetain;
626 mqtt_client->mqttOptions.useCleanSession = mqttOptions->useCleanSession;
627 mqtt_client->mqttOptions.qualityOfServiceValue = mqttOptions->qualityOfServiceValue;
628 }
629 else
630 {
631 clear_mqtt_options(mqtt_client);
632 }
633 return result;
634}
635
636static void ProcessPublishMessage(MQTT_CLIENT* mqtt_client, uint8_t* initialPos, size_t packetLength, int flags)
637{
638 bool isDuplicateMsg = (flags & DUPLICATE_FLAG_MASK) ? true : false;
639 bool isRetainMsg = (flags & RETAIN_FLAG_MASK) ? true : false;
640 QOS_VALUE qosValue = (flags == 0) ? DELIVER_AT_MOST_ONCE : (flags & QOS_LEAST_ONCE_FLAG_MASK) ? DELIVER_AT_LEAST_ONCE : DELIVER_EXACTLY_ONCE;
641
642 uint8_t* iterator = initialPos;
643 size_t numberOfBytesToBeRead = packetLength;
644 size_t lengthOfTopicName = numberOfBytesToBeRead;
645 char* topicName = byteutil_readUTF(&iterator, &lengthOfTopicName);
646 if (topicName == NULL)
647 {
648 LogError("Publish MSG: failure reading topic name");
649 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
650 }
651 else
652 {
653 STRING_HANDLE trace_log = NULL;
654
655#ifndef NO_LOGGING
656 if (is_trace_enabled(mqtt_client))
657 {
658 trace_log = STRING_construct_sprintf("PUBLISH | IS_DUP: %s | RETAIN: %d | QOS: %s | TOPIC_NAME: %s", isDuplicateMsg ? TRUE_CONST : FALSE_CONST,
659 isRetainMsg ? 1 : 0, MU_ENUM_TO_STRING(QOS_VALUE, qosValue), topicName);
660 }
661#endif
662 uint16_t packetId = 0;
663 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
664 if (qosValue != DELIVER_AT_MOST_ONCE)
665 {
666 packetId = byteutil_read_uint16(&iterator, numberOfBytesToBeRead);
667#ifndef NO_LOGGING
668 if (is_trace_enabled(mqtt_client))
669 {
670 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, packetId);
671 }
672#endif
673 }
674 if ((qosValue != DELIVER_AT_MOST_ONCE) && (packetId == 0))
675 {
676 LogError("Publish MSG: packetId=0, invalid");
677 set_error_callback(mqtt_client, MQTT_CLIENT_PARSE_ERROR);
678 }
679 else
680 {
681 numberOfBytesToBeRead = packetLength - (iterator - initialPos);
682
683 MQTT_MESSAGE_HANDLE msgHandle = mqttmessage_create_in_place(packetId, topicName, qosValue, iterator, numberOfBytesToBeRead);
684 if (msgHandle == NULL)
685 {
686 LogError("failure in mqttmessage_create");
687 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
688 }
689 else if (mqttmessage_setIsDuplicateMsg(msgHandle, isDuplicateMsg) != 0 ||
690 mqttmessage_setIsRetained(msgHandle, isRetainMsg) != 0)
691 {
692 LogError("failure setting mqtt message property");
693 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
694 }
695 else
696 {
697#ifndef NO_LOGGING
698 if (is_trace_enabled(mqtt_client))
699 {
700 STRING_sprintf(trace_log, " | PAYLOAD_LEN: %lu", (unsigned long)numberOfBytesToBeRead);
701 log_incoming_trace(mqtt_client, trace_log);
702 }
703#endif
704 mqtt_client->fnMessageRecv(msgHandle, mqtt_client->ctx);
705
706 BUFFER_HANDLE pubRel = NULL;
707 if (qosValue == DELIVER_EXACTLY_ONCE)
708 {
709 pubRel = mqtt_codec_publishReceived(packetId);
710 if (pubRel == NULL)
711 {
712 LogError("Failed to allocate publish receive message.");
713 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
714 }
715 }
716 else if (qosValue == DELIVER_AT_LEAST_ONCE)
717 {
718 pubRel = mqtt_codec_publishAck(packetId);
719 if (pubRel == NULL)
720 {
721 LogError("Failed to allocate publish ack message.");
722 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
723 }
724 }
725 if (pubRel != NULL)
726 {
727 size_t size = BUFFER_length(pubRel);
728 if (sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size) != 0)
729 {
730 LogError("Failed sending publish reply.");
731 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
732 }
733 BUFFER_delete(pubRel);
734 }
735 }
736 mqttmessage_destroy(msgHandle);
737 }
738
739 if (trace_log != NULL)
740 {
741 STRING_delete(trace_log);
742 }
743
744 free(topicName);
745 }
746}
747
748static void recvCompleteCallback(void* context, CONTROL_PACKET_TYPE packet, int flags, BUFFER_HANDLE headerData)
749{
750 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)context;
751 if (mqtt_client != NULL)
752 {
753 size_t packetLength = 0;
754 uint8_t* iterator = NULL;
755 if (headerData != NULL)
756 {
757 packetLength = BUFFER_length(headerData);
758 iterator = BUFFER_u_char(headerData);
759 }
760
761#ifdef ENABLE_RAW_TRACE
762 logIncomingRawTrace(mqtt_client, packet, (uint8_t)flags, iterator, packetLength);
763#endif
764 if ((iterator != NULL && packetLength > 0) || packet == PINGRESP_TYPE)
765 {
766 switch (packet)
767 {
768 case CONNACK_TYPE:
769 {
770 /*Codes_SRS_MQTT_CLIENT_07_028: [If the actionResult parameter is of type CONNECT_ACK then the msgInfo value shall be a CONNECT_ACK structure.]*/
771 CONNECT_ACK connack = { 0 };
772 if (packetLength != 2) // CONNACK payload must be only 2 bytes
773 {
774 LogError("Invalid CONNACK packet.");
775 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
776 break;
777 }
778
779 uint8_t connect_acknowledge_flags = byteutil_readByte(&iterator);
780 if (connect_acknowledge_flags & 0xFE) // bits 7-1 must be zero
781 {
782 LogError("Invalid CONNACK packet.");
783 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
784 break;
785 }
786
787 connack.isSessionPresent = (connect_acknowledge_flags == 0x1) ? true : false;
788 uint8_t rc = byteutil_readByte(&iterator);
789 connack.returnCode =
790 (rc < ((uint8_t)CONN_REFUSED_UNKNOWN)) ?
791 (CONNECT_RETURN_CODE)rc : CONN_REFUSED_UNKNOWN;
792
793#ifndef NO_LOGGING
794 if (is_trace_enabled(mqtt_client))
795 {
796 STRING_HANDLE trace_log = STRING_construct_sprintf("CONNACK | SESSION_PRESENT: %s | RETURN_CODE: 0x%x", connack.isSessionPresent ? TRUE_CONST : FALSE_CONST, connack.returnCode);
797 log_incoming_trace(mqtt_client, trace_log);
798 STRING_delete(trace_log);
799 }
800#endif
801 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_CONNACK, (void*)&connack, mqtt_client->ctx);
802
803 if (connack.returnCode == CONNECTION_ACCEPTED)
804 {
805 mqtt_client->mqtt_status |= MQTT_STATUS_CLIENT_CONNECTED;
806 }
807 break;
808 }
809 case PUBLISH_TYPE:
810 {
811 ProcessPublishMessage(mqtt_client, iterator, packetLength, flags);
812 break;
813 }
814 case PUBACK_TYPE:
815 case PUBREC_TYPE:
816 case PUBREL_TYPE:
817 case PUBCOMP_TYPE:
818 {
819 if (packetLength != 2) // PUBXXX payload must be only 2 bytes
820 {
821 LogError("Invalid packet length.");
822 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
823 break;
824 }
825
826 /*Codes_SRS_MQTT_CLIENT_07_029: [If the actionResult parameter are of types PUBACK_TYPE, PUBREC_TYPE, PUBREL_TYPE or PUBCOMP_TYPE then the msgInfo value shall be a PUBLISH_ACK structure.]*/
827 MQTT_CLIENT_EVENT_RESULT action = (packet == PUBACK_TYPE) ? MQTT_CLIENT_ON_PUBLISH_ACK :
828 (packet == PUBREC_TYPE) ? MQTT_CLIENT_ON_PUBLISH_RECV :
829 (packet == PUBREL_TYPE) ? MQTT_CLIENT_ON_PUBLISH_REL : MQTT_CLIENT_ON_PUBLISH_COMP;
830
831 PUBLISH_ACK publish_ack = { 0 };
832 publish_ack.packetId = byteutil_read_uint16(&iterator, packetLength);
833
834#ifndef NO_LOGGING
835 if (is_trace_enabled(mqtt_client))
836 {
837 STRING_HANDLE trace_log = STRING_construct_sprintf("%s | PACKET_ID: %"PRIu16, packet == PUBACK_TYPE ? "PUBACK" : (packet == PUBREC_TYPE) ? "PUBREC" : (packet == PUBREL_TYPE) ? "PUBREL" : "PUBCOMP",
838 publish_ack.packetId);
839
840 log_incoming_trace(mqtt_client, trace_log);
841 STRING_delete(trace_log);
842 }
843#endif
844 BUFFER_HANDLE pubRel = NULL;
845 mqtt_client->fnOperationCallback(mqtt_client, action, (void*)&publish_ack, mqtt_client->ctx);
846 if (packet == PUBREC_TYPE)
847 {
848 pubRel = mqtt_codec_publishRelease(publish_ack.packetId);
849 if (pubRel == NULL)
850 {
851 LogError("Failed to allocate publish release message.");
852 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
853 }
854 }
855 else if (packet == PUBREL_TYPE)
856 {
857 pubRel = mqtt_codec_publishComplete(publish_ack.packetId);
858 if (pubRel == NULL)
859 {
860 LogError("Failed to allocate publish complete message.");
861 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
862 }
863 }
864 if (pubRel != NULL)
865 {
866 size_t size = BUFFER_length(pubRel);
867 if (sendPacketItem(mqtt_client, BUFFER_u_char(pubRel), size) != 0)
868 {
869 LogError("Failed sending publish reply.");
870 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
871 }
872 BUFFER_delete(pubRel);
873 }
874 break;
875 }
876 case SUBACK_TYPE:
877 {
878
879 /*Codes_SRS_MQTT_CLIENT_07_030: [If the actionResult parameter is of type SUBACK_TYPE then the msgInfo value shall be a SUBSCRIBE_ACK structure.]*/
880 SUBSCRIBE_ACK suback = { 0 };
881
882 size_t remainLen = packetLength;
883 suback.packetId = byteutil_read_uint16(&iterator, packetLength);
884 remainLen -= 2;
885
886#ifndef NO_LOGGING
887 STRING_HANDLE trace_log = NULL;
888 if (is_trace_enabled(mqtt_client))
889 {
890 trace_log = STRING_construct_sprintf("SUBACK | PACKET_ID: %"PRIu16, suback.packetId);
891 }
892#endif
893 // Allocate the remaining len
894 suback.qosReturn = (QOS_VALUE*)malloc(sizeof(QOS_VALUE)*remainLen);
895 if (suback.qosReturn != NULL)
896 {
897 while (remainLen > 0)
898 {
899 uint8_t qosRet = byteutil_readByte(&iterator);
900 if (qosRet & 0x7C) // SUBACK QOS bits 6-2 must be zero
901 {
902 LogError("Invalid SUBACK_TYPE packet.");
903 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
904 break;
905 }
906 suback.qosReturn[suback.qosCount++] =
907 (qosRet <= ((uint8_t)DELIVER_EXACTLY_ONCE)) ?
908 (QOS_VALUE)qosRet : DELIVER_FAILURE;
909 remainLen--;
910#ifndef NO_LOGGING
911 if (is_trace_enabled(mqtt_client))
912 {
913 STRING_sprintf(trace_log, " | RETURN_CODE: %"PRIu16, suback.qosReturn[suback.qosCount-1]);
914 }
915#endif
916 }
917
918#ifndef NO_LOGGING
919 if (is_trace_enabled(mqtt_client))
920 {
921 log_incoming_trace(mqtt_client, trace_log);
922 STRING_delete(trace_log);
923 }
924#endif
925 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_SUBSCRIBE_ACK, (void*)&suback, mqtt_client->ctx);
926 free(suback.qosReturn);
927 }
928 else
929 {
930 LogError("allocation of quality of service value failed.");
931 set_error_callback(mqtt_client, MQTT_CLIENT_MEMORY_ERROR);
932 }
933 break;
934 }
935 case UNSUBACK_TYPE:
936 {
937 /*Codes_SRS_MQTT_CLIENT_07_031: [If the actionResult parameter is of type UNSUBACK_TYPE then the msgInfo value shall be a UNSUBSCRIBE_ACK structure.]*/
938 UNSUBSCRIBE_ACK unsuback = { 0 };
939 if (packetLength != 2) // UNSUBACK_TYPE payload must be only 2 bytes
940 {
941 LogError("Invalid UNSUBACK packet length.");
942 set_error_callback(mqtt_client, MQTT_CLIENT_COMMUNICATION_ERROR);
943 break;
944 }
945
946 unsuback.packetId = byteutil_read_uint16(&iterator, packetLength);
947
948#ifndef NO_LOGGING
949 if (is_trace_enabled(mqtt_client))
950 {
951 STRING_HANDLE trace_log = STRING_construct_sprintf("UNSUBACK | PACKET_ID: %"PRIu16, unsuback.packetId);
952 log_incoming_trace(mqtt_client, trace_log);
953 STRING_delete(trace_log);
954 }
955#endif
956 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_UNSUBSCRIBE_ACK, (void*)&unsuback, mqtt_client->ctx);
957 break;
958 }
959 case PINGRESP_TYPE:
960 mqtt_client->timeSincePing = 0;
961#ifndef NO_LOGGING
962 if (is_trace_enabled(mqtt_client))
963 {
964 STRING_HANDLE trace_log = STRING_construct_sprintf("PINGRESP");
965 log_incoming_trace(mqtt_client, trace_log);
966 STRING_delete(trace_log);
967 }
968#endif
969 // Forward ping response to operation callback
970 mqtt_client->fnOperationCallback(mqtt_client, MQTT_CLIENT_ON_PING_RESPONSE, NULL, mqtt_client->ctx);
971 break;
972 default:
973 break;
974 }
975 }
976 }
977 else
978 {
979 LogError("recvCompleteCallback context failed.");
980 }
981}
982
983void mqtt_client_clear_xio(MQTT_CLIENT_HANDLE handle)
984{
985 if (handle != NULL)
986 {
987 MQTT_CLIENT *mqtt_client = (MQTT_CLIENT *)handle;
988
989 // The upstream transport handle allocates and deallocates the xio handle.
990 // The reference to that xio handle is shared between this layer (mqtt layer)
991 // and the upstream layer. The clearing done here is to signal to this layer
992 // that the handle is no longer available to be used. This is different from
993 // deiniting the mqtt client in that we do not want an entire teardown, but
994 // only a possible re-upping of the xio in the future.
995 mqtt_client->xioHandle = NULL;
996 }
997}
998
999MQTT_CLIENT_HANDLE mqtt_client_init(ON_MQTT_MESSAGE_RECV_CALLBACK msgRecv, ON_MQTT_OPERATION_CALLBACK operation_cb, void* opCallbackCtx, ON_MQTT_ERROR_CALLBACK onErrorCallBack, void* errorCBCtx)
1000{
1001 MQTT_CLIENT* result;
1002 /*Codes_SRS_MQTT_CLIENT_07_001: [If the parameters ON_MQTT_MESSAGE_RECV_CALLBACK is NULL then mqttclient_init shall return NULL.]*/
1003 if (msgRecv == NULL || operation_cb == NULL)
1004 {
1005 LogError("Invalid parameter specified msgRecv: %p, operation_cb: %p", msgRecv, operation_cb);
1006 result = NULL;
1007 }
1008 else
1009 {
1010 result = malloc(sizeof(MQTT_CLIENT));
1011 if (result == NULL)
1012 {
1013 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
1014 LogError("mqtt_client_init failure: Allocation Failure");
1015 }
1016 else
1017 {
1018 memset(result, 0, sizeof(MQTT_CLIENT));
1019 /*Codes_SRS_MQTT_CLIENT_07_003: [mqttclient_init shall allocate MQTTCLIENT_DATA_INSTANCE and return the MQTTCLIENT_HANDLE on success.]*/
1020 result->packetState = UNKNOWN_TYPE;
1021 result->fnOperationCallback = operation_cb;
1022 result->ctx = opCallbackCtx;
1023 result->fnMessageRecv = msgRecv;
1024 result->fnOnErrorCallBack = onErrorCallBack;
1025 result->errorCBCtx = errorCBCtx;
1026 result->qosValue = DELIVER_AT_MOST_ONCE;
1027 result->packetTickCntr = tickcounter_create();
1028 result->maxPingRespTime = DEFAULT_MAX_PING_RESPONSE_TIME;
1029 if (result->packetTickCntr == NULL)
1030 {
1031 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
1032 LogError("mqtt_client_init failure: tickcounter_create failure");
1033 free(result);
1034 result = NULL;
1035 }
1036 else
1037 {
1038 result->codec_handle = mqtt_codec_create(recvCompleteCallback, result);
1039 if (result->codec_handle == NULL)
1040 {
1041 /*Codes_SRS_MQTT_CLIENT_07_002: [If any failure is encountered then mqttclient_init shall return NULL.]*/
1042 LogError("mqtt_client_init failure: mqtt_codec_create failure");
1043 tickcounter_destroy(result->packetTickCntr);
1044 free(result);
1045 result = NULL;
1046 }
1047 }
1048 }
1049 }
1050 return result;
1051}
1052
1053void mqtt_client_deinit(MQTT_CLIENT_HANDLE handle)
1054{
1055 /*Codes_SRS_MQTT_CLIENT_07_004: [If the parameter handle is NULL then function mqtt_client_deinit shall do nothing.]*/
1056 if (handle != NULL)
1057 {
1058 /*Codes_SRS_MQTT_CLIENT_07_005: [mqtt_client_deinit shall deallocate all memory allocated in this unit.]*/
1059 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1060 tickcounter_destroy(mqtt_client->packetTickCntr);
1061 mqtt_codec_destroy(mqtt_client->codec_handle);
1062 clear_mqtt_options(mqtt_client);
1063 free(mqtt_client);
1064 }
1065}
1066
1067int mqtt_client_connect(MQTT_CLIENT_HANDLE handle, XIO_HANDLE xioHandle, MQTT_CLIENT_OPTIONS* mqttOptions)
1068{
1069 int result;
1070 /*SRS_MQTT_CLIENT_07_006: [If any of the parameters handle, ioHandle, or mqttOptions are NULL then mqtt_client_connect shall return a non-zero value.]*/
1071 if (handle == NULL || mqttOptions == NULL || xioHandle == NULL)
1072 {
1073 LogError("mqtt_client_connect: NULL argument (handle = %p, mqttOptions = %p, xioHandle: %p)", handle, mqttOptions, xioHandle);
1074 result = MU_FAILURE;
1075 }
1076 else
1077 {
1078 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1079 mqtt_client->xioHandle = xioHandle;
1080 mqtt_client->packetState = UNKNOWN_TYPE;
1081 mqtt_client->qosValue = mqttOptions->qualityOfServiceValue;
1082 mqtt_client->keepAliveInterval = mqttOptions->keepAliveInterval;
1083 mqtt_client->maxPingRespTime = (DEFAULT_MAX_PING_RESPONSE_TIME < mqttOptions->keepAliveInterval/2) ? DEFAULT_MAX_PING_RESPONSE_TIME : mqttOptions->keepAliveInterval/2;
1084 if (cloneMqttOptions(mqtt_client, mqttOptions) != 0)
1085 {
1086 LogError("Error: Clone Mqtt Options failed");
1087 result = MU_FAILURE;
1088 }
1089 /*Codes_SRS_MQTT_CLIENT_07_008: [mqtt_client_connect shall open the XIO_HANDLE by calling into the xio_open interface.]*/
1090 else if (xio_open(xioHandle, onOpenComplete, mqtt_client, onBytesReceived, mqtt_client, onIoError, mqtt_client) != 0)
1091 {
1092 /*Codes_SRS_MQTT_CLIENT_07_007: [If any failure is encountered then mqtt_client_connect shall return a non-zero value.]*/
1093 LogError("Error: io_open failed");
1094 result = MU_FAILURE;
1095 mqtt_client->xioHandle = NULL;
1096 // Remove cloned options
1097 clear_mqtt_options(mqtt_client);
1098 }
1099 else
1100 {
1101 result = 0;
1102 }
1103 }
1104 return result;
1105}
1106
1107int mqtt_client_publish(MQTT_CLIENT_HANDLE handle, MQTT_MESSAGE_HANDLE msgHandle)
1108{
1109 int result;
1110 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1111 if (mqtt_client == NULL || msgHandle == NULL)
1112 {
1113 /*Codes_SRS_MQTT_CLIENT_07_019: [If one of the parameters handle or msgHandle is NULL then mqtt_client_publish shall return a non-zero value.]*/
1114 LogError("Invalid parameter specified mqtt_client: %p, msgHandle: %p", mqtt_client, msgHandle);
1115 result = MU_FAILURE;
1116 }
1117 else
1118 {
1119 /*Codes_SRS_MQTT_CLIENT_07_021: [mqtt_client_publish shall get the message information from the MQTT_MESSAGE_HANDLE.]*/
1120 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1121 if (payload == NULL)
1122 {
1123 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1124 LogError("Error: mqttmessage_getApplicationMsg failed");
1125 result = MU_FAILURE;
1126 }
1127 else
1128 {
1129 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1130
1131 QOS_VALUE qos = mqttmessage_getQosType(msgHandle);
1132 bool isDuplicate = mqttmessage_getIsDuplicateMsg(msgHandle);
1133 bool isRetained = mqttmessage_getIsRetained(msgHandle);
1134 uint16_t packetId = mqttmessage_getPacketId(msgHandle);
1135 const char* topicName = mqttmessage_getTopicName(msgHandle);
1136 BUFFER_HANDLE publishPacket = mqtt_codec_publish(qos, isDuplicate, isRetained, packetId, topicName, payload->message, payload->length, trace_log);
1137 if (publishPacket == NULL)
1138 {
1139 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1140 LogError("Error: mqtt_codec_publish failed");
1141 result = MU_FAILURE;
1142 }
1143 else
1144 {
1145 mqtt_client->packetState = PUBLISH_TYPE;
1146
1147 /*Codes_SRS_MQTT_CLIENT_07_022: [On success mqtt_client_publish shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1148 size_t size = BUFFER_length(publishPacket);
1149 if (sendPacketItem(mqtt_client, BUFFER_u_char(publishPacket), size) != 0)
1150 {
1151 /*Codes_SRS_MQTT_CLIENT_07_020: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1152 LogError("Error: mqtt_client_publish send failed");
1153 result = MU_FAILURE;
1154 }
1155 else
1156 {
1157 log_outgoing_trace(mqtt_client, trace_log);
1158 result = 0;
1159 }
1160 BUFFER_delete(publishPacket);
1161 }
1162 if (trace_log != NULL)
1163 {
1164 STRING_delete(trace_log);
1165 }
1166 }
1167 }
1168 return result;
1169}
1170
1171int mqtt_client_subscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count)
1172{
1173 int result;
1174 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1175 if (mqtt_client == NULL || subscribeList == NULL || count == 0 || packetId == 0)
1176 {
1177 /*Codes_SRS_MQTT_CLIENT_07_013: [If any of the parameters handle, subscribeList is NULL or count is 0 then mqtt_client_subscribe shall return a non-zero value.]*/
1178 LogError("Invalid parameter specified mqtt_client: %p, subscribeList: %p, count: %lu, packetId: %d", mqtt_client, subscribeList, (unsigned long)count, packetId);
1179 result = MU_FAILURE;
1180 }
1181 else
1182 {
1183 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1184
1185 BUFFER_HANDLE subPacket = mqtt_codec_subscribe(packetId, subscribeList, count, trace_log);
1186 if (subPacket == NULL)
1187 {
1188 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1189 LogError("Error: mqtt_codec_subscribe failed");
1190 result = MU_FAILURE;
1191 }
1192 else
1193 {
1194 mqtt_client->packetState = SUBSCRIBE_TYPE;
1195
1196 size_t size = BUFFER_length(subPacket);
1197 /*Codes_SRS_MQTT_CLIENT_07_015: [On success mqtt_client_subscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1198 if (sendPacketItem(mqtt_client, BUFFER_u_char(subPacket), size) != 0)
1199 {
1200 /*Codes_SRS_MQTT_CLIENT_07_014: [If any failure is encountered then mqtt_client_subscribe shall return a non-zero value.]*/
1201 LogError("Error: mqtt_client_subscribe send failed");
1202 result = MU_FAILURE;
1203 }
1204 else
1205 {
1206 log_outgoing_trace(mqtt_client, trace_log);
1207 result = 0;
1208 }
1209 BUFFER_delete(subPacket);
1210 }
1211 if (trace_log != NULL)
1212 {
1213 STRING_delete(trace_log);
1214 }
1215 }
1216 return result;
1217}
1218
1219int mqtt_client_unsubscribe(MQTT_CLIENT_HANDLE handle, uint16_t packetId, const char** unsubscribeList, size_t count)
1220{
1221 int result;
1222 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1223 if (mqtt_client == NULL || unsubscribeList == NULL || count == 0 || packetId == 0)
1224 {
1225 /*Codes_SRS_MQTT_CLIENT_07_016: [If any of the parameters handle, unsubscribeList is NULL or count is 0 then mqtt_client_unsubscribe shall return a non-zero value.]*/
1226 LogError("Invalid parameter specified mqtt_client: %p, unsubscribeList: %p, count: %lu, packetId: %d", mqtt_client, unsubscribeList, (unsigned long)count, packetId);
1227 result = MU_FAILURE;
1228 }
1229 else
1230 {
1231 STRING_HANDLE trace_log = construct_trace_log_handle(mqtt_client);
1232
1233 BUFFER_HANDLE unsubPacket = mqtt_codec_unsubscribe(packetId, unsubscribeList, count, trace_log);
1234 if (unsubPacket == NULL)
1235 {
1236 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.]*/
1237 LogError("Error: mqtt_codec_unsubscribe failed");
1238 result = MU_FAILURE;
1239 }
1240 else
1241 {
1242 mqtt_client->packetState = UNSUBSCRIBE_TYPE;
1243
1244 size_t size = BUFFER_length(unsubPacket);
1245 /*Codes_SRS_MQTT_CLIENT_07_018: [On success mqtt_client_unsubscribe shall send the MQTT SUBCRIBE packet to the endpoint.]*/
1246 if (sendPacketItem(mqtt_client, BUFFER_u_char(unsubPacket), size) != 0)
1247 {
1248 /*Codes_SRS_MQTT_CLIENT_07_017: [If any failure is encountered then mqtt_client_unsubscribe shall return a non-zero value.].]*/
1249 LogError("Error: mqtt_client_unsubscribe send failed");
1250 result = MU_FAILURE;
1251 }
1252 else
1253 {
1254 log_outgoing_trace(mqtt_client, trace_log);
1255 result = 0;
1256 }
1257 BUFFER_delete(unsubPacket);
1258 }
1259 if (trace_log != NULL)
1260 {
1261 STRING_delete(trace_log);
1262 }
1263 }
1264 return result;
1265}
1266
1267int mqtt_client_disconnect(MQTT_CLIENT_HANDLE handle, ON_MQTT_DISCONNECTED_CALLBACK callback, void* ctx)
1268{
1269 int result;
1270 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1271 if (mqtt_client == NULL)
1272 {
1273 /*Codes_SRS_MQTT_CLIENT_07_010: [If the parameters handle is NULL then mqtt_client_disconnect shall return a non-zero value.]*/
1274 result = MU_FAILURE;
1275 }
1276 else
1277 {
1278 if (mqtt_client->mqtt_status & MQTT_STATUS_CLIENT_CONNECTED)
1279 {
1280 BUFFER_HANDLE disconnectPacket = mqtt_codec_disconnect();
1281 if (disconnectPacket == NULL)
1282 {
1283 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1284 LogError("Error: mqtt_client_disconnect failed");
1285 mqtt_client->packetState = PACKET_TYPE_ERROR;
1286 result = MU_FAILURE;
1287 }
1288 else
1289 {
1290 /* Codes_SRS_MQTT_CLIENT_07_037: [ if callback is not NULL callback shall be called once the mqtt connection has been disconnected ] */
1291 mqtt_client->disconnect_cb = callback;
1292 mqtt_client->disconnect_ctx = ctx;
1293 mqtt_client->packetState = DISCONNECT_TYPE;
1294
1295 size_t size = BUFFER_length(disconnectPacket);
1296 /*Codes_SRS_MQTT_CLIENT_07_012: [On success mqtt_client_disconnect shall send the MQTT DISCONNECT packet to the endpoint.]*/
1297 if (sendPacketItem(mqtt_client, BUFFER_u_char(disconnectPacket), size) != 0)
1298 {
1299 /*Codes_SRS_MQTT_CLIENT_07_011: [If any failure is encountered then mqtt_client_disconnect shall return a non-zero value.]*/
1300 LogError("Error: mqtt_client_disconnect send failed");
1301 result = MU_FAILURE;
1302 }
1303 else
1304 {
1305 if (is_trace_enabled(mqtt_client))
1306 {
1307 STRING_HANDLE trace_log = STRING_construct("DISCONNECT");
1308 log_outgoing_trace(mqtt_client, trace_log);
1309 STRING_delete(trace_log);
1310 }
1311 result = 0;
1312 }
1313 BUFFER_delete(disconnectPacket);
1314 }
1315 clear_mqtt_options(mqtt_client);
1316 }
1317 else
1318 {
1319 // If the client is not connected then just close the underlying socket
1320 mqtt_client->disconnect_cb = callback;
1321 mqtt_client->disconnect_ctx = ctx;
1322
1323 close_connection(mqtt_client);
1324 clear_mqtt_options(mqtt_client);
1325 result = 0;
1326 }
1327 }
1328 return result;
1329}
1330
1331void mqtt_client_dowork(MQTT_CLIENT_HANDLE handle)
1332{
1333 MQTT_CLIENT* mqtt_client = (MQTT_CLIENT*)handle;
1334 /*Codes_SRS_MQTT_CLIENT_18_001: [If the client is disconnected, mqtt_client_dowork shall do nothing.]*/
1335 /*Codes_SRS_MQTT_CLIENT_07_023: [If the parameter handle is NULL then mqtt_client_dowork shall do nothing.]*/
1336 if (mqtt_client != NULL && mqtt_client->xioHandle != NULL)
1337 {
1338 if (mqtt_client->mqtt_status & MQTT_STATUS_PENDING_CLOSE)
1339 {
1340 close_connection(mqtt_client);
1341 // turn off pending close
1342 mqtt_client->mqtt_status &= ~MQTT_STATUS_PENDING_CLOSE;
1343 }
1344 else
1345 {
1346 /*Codes_SRS_MQTT_CLIENT_07_024: [mqtt_client_dowork shall call the xio_dowork function to complete operations.]*/
1347 xio_dowork(mqtt_client->xioHandle);
1348
1349 /*Codes_SRS_MQTT_CLIENT_07_025: [mqtt_client_dowork shall retrieve the the last packet send value and ...]*/
1350 if (mqtt_client->mqtt_status & MQTT_STATUS_SOCKET_CONNECTED &&
1351 mqtt_client->mqtt_status & MQTT_STATUS_CLIENT_CONNECTED &&
1352 mqtt_client->keepAliveInterval > 0)
1353 {
1354 tickcounter_ms_t current_ms;
1355 if (tickcounter_get_current_ms(mqtt_client->packetTickCntr, &current_ms) != 0)
1356 {
1357 LogError("Error: tickcounter_get_current_ms failed");
1358 }
1359 else
1360 {
1361 /* Codes_SRS_MQTT_CLIENT_07_035: [If the timeSincePing has expired past the maxPingRespTime then mqtt_client_dowork shall call the Error Callback function with the message MQTT_CLIENT_NO_PING_RESPONSE] */
1362 if (mqtt_client->timeSincePing > 0 && ((current_ms - mqtt_client->timeSincePing)/1000) > mqtt_client->maxPingRespTime)
1363 {
1364 // We haven't gotten a ping response in the alloted time
1365 set_error_callback(mqtt_client, MQTT_CLIENT_NO_PING_RESPONSE);
1366 mqtt_client->timeSincePing = 0;
1367 mqtt_client->packetSendTimeMs = 0;
1368 mqtt_client->packetState = UNKNOWN_TYPE;
1369 }
1370 else if (((current_ms - mqtt_client->packetSendTimeMs) / 1000) >= mqtt_client->keepAliveInterval)
1371 {
1372 /*Codes_SRS_MQTT_CLIENT_07_026: [if keepAliveInternal is > 0 and the send time is greater than the MQTT KeepAliveInterval then it shall construct an MQTT PINGREQ packet.]*/
1373 BUFFER_HANDLE pingPacket = mqtt_codec_ping();
1374 if (pingPacket != NULL)
1375 {
1376 size_t size = BUFFER_length(pingPacket);
1377 (void)sendPacketItem(mqtt_client, BUFFER_u_char(pingPacket), size);
1378 BUFFER_delete(pingPacket);
1379 (void)tickcounter_get_current_ms(mqtt_client->packetTickCntr, &mqtt_client->timeSincePing);
1380
1381 if (is_trace_enabled(mqtt_client))
1382 {
1383 STRING_HANDLE trace_log = STRING_construct("PINGREQ");
1384 log_outgoing_trace(mqtt_client, trace_log);
1385 STRING_delete(trace_log);
1386 }
1387 }
1388 }
1389 }
1390 }
1391 }
1392 }
1393}
1394
1395void mqtt_client_set_trace(MQTT_CLIENT_HANDLE handle, bool traceOn, bool rawBytesOn)
1396{
1397 AZURE_UNREFERENCED_PARAMETER(handle);
1398 AZURE_UNREFERENCED_PARAMETER(traceOn);
1399 AZURE_UNREFERENCED_PARAMETER(rawBytesOn);
1400#ifndef NO_LOGGING
1401 if (handle != NULL)
1402 {
1403 if (traceOn)
1404 {
1405 handle->mqtt_flags |= MQTT_FLAGS_LOG_TRACE;
1406 }
1407 else
1408 {
1409 handle->mqtt_flags &= ~MQTT_FLAGS_LOG_TRACE;
1410 }
1411#ifdef ENABLE_RAW_TRACE
1412 if (rawBytesOn)
1413 {
1414 handle->mqtt_flags |= MQTT_FLAGS_RAW_TRACE;
1415 }
1416 else
1417 {
1418 handle->mqtt_flags &= ~MQTT_FLAGS_RAW_TRACE;
1419 }
1420#endif
1421 }
1422#endif
1423}
Note: See TracBrowser for help on using the repository browser.