source: azure_iot_hub_f767zi/trunk/azure_iot_sdk/iothub_client/src/iothubtransport_mqtt_common.c@ 464

Last change on this file since 464 was 464, checked in by coas-nagasima, 3 years ago

WolfSSLとAzure IoT SDKを更新

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 170.1 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <ctype.h>
6
7#include "azure_c_shared_utility/optimize_size.h"
8#include "azure_c_shared_utility/gballoc.h"
9#include "azure_c_shared_utility/xlogging.h"
10#include "azure_c_shared_utility/strings.h"
11#include "azure_c_shared_utility/doublylinkedlist.h"
12#include "azure_c_shared_utility/crt_abstractions.h"
13#include "azure_c_shared_utility/agenttime.h"
14#include "azure_c_shared_utility/threadapi.h"
15#include "azure_c_shared_utility/sastoken.h"
16#include "azure_c_shared_utility/tickcounter.h"
17#include "azure_c_shared_utility/tlsio.h"
18#include "azure_c_shared_utility/platform.h"
19#include "azure_c_shared_utility/string_tokenizer.h"
20#include "azure_c_shared_utility/shared_util_options.h"
21#include "azure_c_shared_utility/urlencode.h"
22
23#include "internal/iothub_client_private.h"
24#include "internal/iothub_client_retry_control.h"
25#include "internal/iothub_transport_ll_private.h"
26#include "internal/iothubtransport_mqtt_common.h"
27#include "internal/iothubtransport.h"
28#include "internal/iothub_internal_consts.h"
29
30#include "azure_umqtt_c/mqtt_client.h"
31
32#include "iothub_client_options.h"
33#include "iothub_client_version.h"
34
35#include <stdarg.h>
36#include <stdio.h>
37
38#include <limits.h>
39#include <inttypes.h>
40
41#define SAS_REFRESH_MULTIPLIER .8
42#define EPOCH_TIME_T_VALUE 0
43#define DEFAULT_MQTT_KEEPALIVE 4*60 // 4 min
44#define DEFAULT_CONNACK_TIMEOUT 30 // 30 seconds
45#define BUILD_CONFIG_USERNAME 24
46#define SAS_TOKEN_DEFAULT_LEN 10
47#define RESEND_TIMEOUT_VALUE_MIN 1*60
48#define MAX_SEND_RECOUNT_LIMIT 2
49#define DEFAULT_CONNECTION_INTERVAL 30
50#define FAILED_CONN_BACKOFF_VALUE 5
51#define STATUS_CODE_FAILURE_VALUE 500
52#define STATUS_CODE_TIMEOUT_VALUE 408
53
54#define DEFAULT_RETRY_POLICY IOTHUB_CLIENT_RETRY_EXPONENTIAL_BACKOFF_WITH_JITTER
55#define DEFAULT_RETRY_TIMEOUT_IN_SECONDS 0
56#define MAX_DISCONNECT_VALUE 50
57
58#define ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS 60
59#define TWIN_REPORT_UPDATE_TIMEOUT_SECS (60*5)
60
61static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin";
62static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods";
63
64static const char* TOPIC_GET_DESIRED_STATE = "$iothub/twin/res/#";
65static const char* TOPIC_NOTIFICATION_STATE = "$iothub/twin/PATCH/properties/desired/#";
66
67static const char* TOPIC_DEVICE_MSG = "devices/%s/messages/devicebound/#";
68static const char* TOPIC_DEVICE_MODULE_MSG = "devices/%s/modules/%s/messages/devicebound/#";
69static const char* TOPIC_DEVICE_DEVICE = "devices/%s/messages/events/";
70static const char* TOPIC_DEVICE_DEVICE_MODULE = "devices/%s/modules/%s/messages/events/";
71
72static const char* TOPIC_INPUT_QUEUE_NAME = "devices/%s/modules/%s/#";
73
74static const char* TOPIC_DEVICE_METHOD_SUBSCRIBE = "$iothub/methods/POST/#";
75
76static const char* PROPERTY_SEPARATOR = "&";
77static const char* REPORTED_PROPERTIES_TOPIC = "$iothub/twin/PATCH/properties/reported/?$rid=%"PRIu16;
78static const char* GET_PROPERTIES_TOPIC = "$iothub/twin/GET/?$rid=%"PRIu16;
79static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%s";
80
81static const char* REQUEST_ID_PROPERTY = "?$rid=";
82
83static const char* MESSAGE_ID_PROPERTY = "mid";
84static const char* MESSAGE_CREATION_TIME_UTC = "ctime";
85static const char* MESSAGE_USER_ID = "uid";
86static const char* CORRELATION_ID_PROPERTY = "cid";
87static const char* CONTENT_TYPE_PROPERTY = "ct";
88static const char* CONTENT_ENCODING_PROPERTY = "ce";
89static const char* DIAGNOSTIC_ID_PROPERTY = "diagid";
90static const char* DIAGNOSTIC_CONTEXT_PROPERTY = "diagctx";
91static const char* CONNECTION_DEVICE_ID = "cdid";
92static const char* CONNECTION_MODULE_ID_PROPERTY = "cmid";
93
94static const char* DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY = "creationtimeutc";
95
96static const char DT_MODEL_ID_TOKEN[] = "model-id";
97
98static const char DEFAULT_IOTHUB_PRODUCT_IDENTIFIER[] = CLIENT_DEVICE_TYPE_PREFIX "/" IOTHUB_SDK_VERSION;
99
100#define TOLOWER(c) (((c>='A') && (c<='Z'))?c-'A'+'a':c)
101
102#define UNSUBSCRIBE_FROM_TOPIC 0x0000
103#define SUBSCRIBE_GET_REPORTED_STATE_TOPIC 0x0001
104#define SUBSCRIBE_NOTIFICATION_STATE_TOPIC 0x0002
105#define SUBSCRIBE_TELEMETRY_TOPIC 0x0004
106#define SUBSCRIBE_DEVICE_METHOD_TOPIC 0x0008
107#define SUBSCRIBE_INPUT_QUEUE_TOPIC 0x0010
108#define SUBSCRIBE_TOPIC_COUNT 5
109
110MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(MQTT_CLIENT_EVENT_ERROR, MQTT_CLIENT_EVENT_ERROR_VALUES)
111
112typedef struct SYSTEM_PROPERTY_INFO_TAG
113{
114 const char* propName;
115 size_t propLength;
116} SYSTEM_PROPERTY_INFO;
117
118static SYSTEM_PROPERTY_INFO sysPropList[] = {
119 { "%24.exp", 7 },
120 { "%24.mid", 7 },
121 { "%24.uid", 7 },
122 { "%24.to", 6 },
123 { "%24.cid", 7 },
124 { "%24.ct", 6 },
125 { "%24.ce", 6 },
126 { "devices/", 8 },
127 { "iothub-operation", 16 },
128 { "iothub-ack", 10 },
129 { "%24.on", 6 },
130 { "%24.cdid", 8 },
131 { "%24.cmid", 8 }
132};
133
134static const int slashes_to_reach_input_name = 5;
135
136typedef enum DEVICE_TWIN_MSG_TYPE_TAG
137{
138 REPORTED_STATE,
139 RETRIEVE_PROPERTIES
140} DEVICE_TWIN_MSG_TYPE;
141
142typedef enum MQTT_TRANSPORT_CREDENTIAL_TYPE_TAG
143{
144 CREDENTIAL_NOT_BUILD,
145 X509,
146 SAS_TOKEN_FROM_USER,
147 DEVICE_KEY,
148} MQTT_TRANSPORT_CREDENTIAL_TYPE;
149
150typedef enum MQTT_CLIENT_STATUS_TAG
151{
152 MQTT_CLIENT_STATUS_NOT_CONNECTED,
153 MQTT_CLIENT_STATUS_CONNECTING,
154 MQTT_CLIENT_STATUS_CONNECTED,
155 MQTT_CLIENT_STATUS_PENDING_CLOSE,
156 MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT
157} MQTT_CLIENT_STATUS;
158
159typedef struct MQTTTRANSPORT_HANDLE_DATA_TAG
160{
161 // Topic control
162 STRING_HANDLE topic_MqttEvent;
163 STRING_HANDLE topic_MqttMessage;
164 STRING_HANDLE topic_GetState;
165 STRING_HANDLE topic_NotifyState;
166 STRING_HANDLE topic_InputQueue;
167
168 STRING_HANDLE topic_DeviceMethods;
169
170 uint32_t topics_ToSubscribe;
171
172 // Connection related constants
173 STRING_HANDLE hostAddress;
174 STRING_HANDLE device_id;
175 STRING_HANDLE module_id;
176 STRING_HANDLE devicesAndModulesPath;
177 int portNum;
178 // conn_attempted indicates whether a connection has *ever* been attempted on the lifetime
179 // of this handle. Even if a given xio transport is added/removed, this always stays true.
180 bool conn_attempted;
181
182 MQTT_GET_IO_TRANSPORT get_io_transport;
183
184 // The current mqtt iothub implementation requires that the hub name and the domain suffix be passed as the first of a series of segments
185 // passed through the username portion of the connection frame.
186 // The second segment will contain the device id. The two segments are delemited by a "/".
187 // The first segment can be a maximum 256 characters.
188 // The second segment can be a maximum 128 characters.
189 // With the / delimeter you have 384 chars (Plus a terminator of 0).
190 STRING_HANDLE configPassedThroughUsername;
191
192 // Protocol
193 MQTT_CLIENT_HANDLE mqttClient;
194 XIO_HANDLE xioTransport;
195
196 // Session - connection
197 uint16_t packetId;
198 uint16_t twin_resp_packet_id;
199
200 // Connection state control
201 bool isRegistered;
202 MQTT_CLIENT_STATUS mqttClientStatus;
203 bool isDestroyCalled;
204 bool isRetryExpiredCallbackCalled;
205 bool device_twin_get_sent;
206 bool twin_resp_sub_recv;
207 bool isRecoverableError;
208 uint16_t keepAliveValue;
209 uint16_t connect_timeout_in_sec;
210 tickcounter_ms_t mqtt_connect_time;
211 size_t connectFailCount;
212 tickcounter_ms_t connectTick;
213 bool log_trace;
214 bool raw_trace;
215 TICK_COUNTER_HANDLE msgTickCounter;
216 OPTIONHANDLER_HANDLE saved_tls_options; // Here are the options from the xio layer if any is saved.
217
218 // Internal lists for message tracking
219 PDLIST_ENTRY waitingToSend;
220 DLIST_ENTRY ack_waiting_queue;
221
222 DLIST_ENTRY pending_get_twin_queue;
223
224 // Message tracking
225 CONTROL_PACKET_TYPE currPacketState;
226
227 // Telemetry specific
228 DLIST_ENTRY telemetry_waitingForAck;
229 bool auto_url_encode_decode;
230
231 // Controls frequency of reconnection logic.
232 RETRY_CONTROL_HANDLE retry_control_handle;
233
234 // Auth module used to generating handle authorization
235 // with either SAS Token, x509 Certs, and Device SAS Token
236 IOTHUB_AUTHORIZATION_HANDLE authorization_module;
237
238 TRANSPORT_CALLBACKS_INFO transport_callbacks;
239 void* transport_ctx;
240
241 char* http_proxy_hostname;
242 int http_proxy_port;
243 char* http_proxy_username;
244 char* http_proxy_password;
245 bool isConnectUsernameSet;
246 int disconnect_recv_flag;
247} MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA;
248
249typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
250{
251 tickcounter_ms_t msgCreationTime;
252 tickcounter_ms_t msgPublishTime;
253 size_t retryCount;
254 uint16_t packet_id;
255 uint32_t iothub_msg_id;
256 IOTHUB_DEVICE_TWIN* device_twin_data;
257 DEVICE_TWIN_MSG_TYPE device_twin_msg_type;
258 DLIST_ENTRY entry;
259 IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK userCallback;
260 void* userContext;
261} MQTT_DEVICE_TWIN_ITEM;
262
263typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG
264{
265 tickcounter_ms_t msgPublishTime;
266 size_t retryCount;
267 IOTHUB_MESSAGE_LIST* iotHubMessageEntry;
268 void* context;
269 uint16_t packet_id;
270 DLIST_ENTRY entry;
271} MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST;
272
273typedef struct DEVICE_METHOD_INFO_TAG
274{
275 STRING_HANDLE request_id;
276} DEVICE_METHOD_INFO;
277
278//
279// InternStrnicmp implements strnicmp. strnicmp isn't available on all platforms.
280//
281static int InternStrnicmp(const char* s1, const char* s2, size_t n)
282{
283 int result;
284
285 if (s1 == NULL)
286 {
287 result = -1;
288 }
289 else if (s2 == NULL)
290 {
291 result = 1;
292 }
293 else
294 {
295 result = 0;
296
297 while (n-- && result == 0)
298 {
299 if (*s1 == 0) result = -1;
300 else if (*s2 == 0) result = 1;
301 else
302 {
303
304 result = TOLOWER(*s1) - TOLOWER(*s2);
305 ++s1;
306 ++s2;
307 }
308 }
309 }
310 return result;
311}
312
313//
314// freeProxyData free()'s and resets proxy related settings of the mqtt_transport_instance.
315//
316static void freeProxyData(MQTTTRANSPORT_HANDLE_DATA* transport_data)
317{
318 if (transport_data->http_proxy_hostname != NULL)
319 {
320 free(transport_data->http_proxy_hostname);
321 transport_data->http_proxy_hostname = NULL;
322 }
323
324 if (transport_data->http_proxy_username != NULL)
325 {
326 free(transport_data->http_proxy_username);
327 transport_data->http_proxy_username = NULL;
328 }
329
330 if (transport_data->http_proxy_password != NULL)
331 {
332 free(transport_data->http_proxy_password);
333 transport_data->http_proxy_password = NULL;
334 }
335}
336
337//
338// DestroyXioTransport frees resources associated with MQTT handle and resets appropriate state
339//
340static void DestroyXioTransport(PMQTTTRANSPORT_HANDLE_DATA transport_data)
341{
342 mqtt_client_clear_xio(transport_data->mqttClient);
343 xio_destroy(transport_data->xioTransport);
344 transport_data->xioTransport = NULL;
345}
346
347//
348// setSavedTlsOptions saves off TLS specific options. This is used
349// so that during a disconnection, we have these values available for next reconnection.
350//
351static void setSavedTlsOptions(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options)
352{
353 if (transport->saved_tls_options != NULL)
354 {
355 OptionHandler_Destroy(transport->saved_tls_options);
356 }
357 transport->saved_tls_options = new_options;
358}
359
360//
361// freeTransportHandleData free()'s 'the transport_data and all members that were allocated by it.
362//
363static void freeTransportHandleData(MQTTTRANSPORT_HANDLE_DATA* transport_data)
364{
365 if (transport_data->mqttClient != NULL)
366 {
367 mqtt_client_deinit(transport_data->mqttClient);
368 transport_data->mqttClient = NULL;
369 }
370
371 if (transport_data->retry_control_handle != NULL)
372 {
373 retry_control_destroy(transport_data->retry_control_handle);
374 }
375
376 setSavedTlsOptions(transport_data, NULL);
377
378 tickcounter_destroy(transport_data->msgTickCounter);
379
380 freeProxyData(transport_data);
381
382 STRING_delete(transport_data->devicesAndModulesPath);
383 STRING_delete(transport_data->topic_MqttEvent);
384 STRING_delete(transport_data->topic_MqttMessage);
385 STRING_delete(transport_data->device_id);
386 STRING_delete(transport_data->module_id);
387 STRING_delete(transport_data->hostAddress);
388 STRING_delete(transport_data->configPassedThroughUsername);
389 STRING_delete(transport_data->topic_GetState);
390 STRING_delete(transport_data->topic_NotifyState);
391 STRING_delete(transport_data->topic_DeviceMethods);
392 STRING_delete(transport_data->topic_InputQueue);
393
394 DestroyXioTransport(transport_data);
395
396 free(transport_data);
397}
398
399//
400// getNextPacketId gets the next Packet Id to use and increments internal counter.
401//
402static uint16_t getNextPacketId(PMQTTTRANSPORT_HANDLE_DATA transport_data)
403{
404 if (transport_data->packetId + 1 >= USHRT_MAX)
405 {
406 transport_data->packetId = 1;
407 }
408 else
409 {
410 transport_data->packetId++;
411 }
412 return transport_data->packetId;
413}
414
415#ifndef NO_LOGGING
416//
417// retrieveMqttReturnCodes returns friendly representation of connection code for logging purposes.
418//
419static const char* retrieveMqttReturnCodes(CONNECT_RETURN_CODE rtn_code)
420{
421 switch (rtn_code)
422 {
423 case CONNECTION_ACCEPTED:
424 return "Accepted";
425 case CONN_REFUSED_UNACCEPTABLE_VERSION:
426 return "Unacceptable Version";
427 case CONN_REFUSED_ID_REJECTED:
428 return "Id Rejected";
429 case CONN_REFUSED_SERVER_UNAVAIL:
430 return "Server Unavailable";
431 case CONN_REFUSED_BAD_USERNAME_PASSWORD:
432 return "Bad Username/Password";
433 case CONN_REFUSED_NOT_AUTHORIZED:
434 return "Not Authorized";
435 case CONN_REFUSED_UNKNOWN:
436 default:
437 return "Unknown";
438 }
439}
440#endif // NO_LOGGING
441
442//
443// retrievDeviceMethodRidInfo parses an incoming MQTT topic for a device method and retrieves the request ID it specifies.
444//
445static int retrievDeviceMethodRidInfo(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
446{
447 int result;
448
449 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
450 if (token_handle == NULL)
451 {
452 LogError("Failed creating token from device twin topic.");
453 result = MU_FAILURE;
454 }
455 else
456 {
457 STRING_HANDLE token_value;
458 if ((token_value = STRING_new()) == NULL)
459 {
460 LogError("Failed allocating new string .");
461 result = MU_FAILURE;
462 }
463 else
464 {
465 size_t token_index = 0;
466 size_t request_id_length = strlen(REQUEST_ID_PROPERTY);
467 result = MU_FAILURE;
468 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
469 {
470 if (token_index == 3)
471 {
472 if (STRING_concat_with_STRING(method_name, token_value) != 0)
473 {
474 LogError("Failed STRING_concat_with_STRING.");
475 result = MU_FAILURE;
476 break;
477 }
478 }
479 else if (token_index == 4)
480 {
481 if (STRING_length(token_value) >= request_id_length)
482 {
483 const char* request_id_value = STRING_c_str(token_value);
484 if (memcmp(request_id_value, REQUEST_ID_PROPERTY, request_id_length) == 0)
485 {
486 if (STRING_concat(request_id, request_id_value + request_id_length) != 0)
487 {
488 LogError("Failed STRING_concat failed.");
489 result = MU_FAILURE;
490 }
491 else
492 {
493 result = 0;
494 }
495 break;
496 }
497 }
498 }
499 token_index++;
500 }
501 STRING_delete(token_value);
502 }
503 STRING_TOKENIZER_destroy(token_handle);
504 }
505
506 return result;
507}
508
509//
510// parseDeviceTwinTopicInfo parses information about a topic PUBLISH'd to this device/module.
511//
512static int parseDeviceTwinTopicInfo(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code)
513{
514 int result;
515 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
516 if (token_handle == NULL)
517 {
518 LogError("Failed creating token from device twin topic.");
519 result = MU_FAILURE;
520 *status_code = 0;
521 *request_id = 0;
522 *patch_msg = false;
523 }
524 else
525 {
526 STRING_HANDLE token_value;
527 if ((token_value = STRING_new()) == NULL)
528 {
529 LogError("Failed allocating new string .");
530 result = MU_FAILURE;
531 *status_code = 0;
532 *request_id = 0;
533 *patch_msg = false;
534 }
535 else
536 {
537 result = MU_FAILURE;
538 size_t token_count = 0;
539 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
540 {
541 if (token_count == 2)
542 {
543 if (strcmp(STRING_c_str(token_value), "PATCH") == 0)
544 {
545 *patch_msg = true;
546 *status_code = 0;
547 *request_id = 0;
548 result = 0;
549 break;
550 }
551 else
552 {
553 *patch_msg = false;
554 }
555 }
556 else if (token_count == 3)
557 {
558 *status_code = (int)atol(STRING_c_str(token_value));
559 if (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/?$rid=") == 0)
560 {
561 *request_id = (size_t)atol(STRING_c_str(token_value));
562 }
563 *patch_msg = false;
564 result = 0;
565 break;
566 }
567 token_count++;
568 }
569 STRING_delete(token_value);
570 }
571 STRING_TOKENIZER_destroy(token_handle);
572 }
573 return result;
574}
575
576//
577// retrieveTopicType translates an MQTT topic PUBLISH'd to this device/module into what type (e.g. twin, method, etc.) it represents.
578//
579static IOTHUB_IDENTITY_TYPE retrieveTopicType(const char* topic_resp, const char* input_queue)
580{
581 IOTHUB_IDENTITY_TYPE type;
582 if (InternStrnicmp(topic_resp, TOPIC_DEVICE_TWIN_PREFIX, sizeof(TOPIC_DEVICE_TWIN_PREFIX) - 1) == 0)
583 {
584 type = IOTHUB_TYPE_DEVICE_TWIN;
585 }
586 else if (InternStrnicmp(topic_resp, TOPIC_DEVICE_METHOD_PREFIX, sizeof(TOPIC_DEVICE_METHOD_PREFIX) - 1) == 0)
587 {
588 type = IOTHUB_TYPE_DEVICE_METHODS;
589 }
590 // input_queue contains additional "#" from subscribe, which we strip off on comparing incoming.
591 else if ((input_queue != NULL) && InternStrnicmp(topic_resp, input_queue, strlen(input_queue) - 1) == 0)
592 {
593 type = IOTHUB_TYPE_EVENT_QUEUE;
594 }
595 else
596 {
597 type = IOTHUB_TYPE_TELEMETRY;
598 }
599 return type;
600
601}
602
603//
604// notifyApplicationOfSendMessageComplete lets application know that messages in the iothubMsgList have completed (or should be considered failed) with confirmResult status.
605//
606static void notifyApplicationOfSendMessageComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult)
607{
608 DLIST_ENTRY messageCompleted;
609 DList_InitializeListHead(&messageCompleted);
610 DList_InsertTailList(&messageCompleted, &(iothubMsgList->entry));
611 transport_data->transport_callbacks.send_complete_cb(&messageCompleted, confirmResult, transport_data->transport_ctx);
612}
613
614//
615// addUserPropertiesTouMqttMessage translates application properties in iothub_message_handle (set by the application with IoTHubMessage_SetProperty e.g.)
616// into a representation in the MQTT TOPIC topic_string.
617//
618static int addUserPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
619{
620 int result = 0;
621 const char* const* propertyKeys;
622 const char* const* propertyValues;
623 size_t propertyCount;
624 size_t index = *index_ptr;
625 MAP_HANDLE properties_map = IoTHubMessage_Properties(iothub_message_handle);
626 if (properties_map != NULL)
627 {
628 if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
629 {
630 LogError("Failed to get the internals of the property map.");
631 result = MU_FAILURE;
632 }
633 else
634 {
635 if (propertyCount != 0)
636 {
637 for (index = 0; index < propertyCount && result == 0; index++)
638 {
639 if (urlencode)
640 {
641 STRING_HANDLE property_key = URL_EncodeString(propertyKeys[index]);
642 STRING_HANDLE property_value = URL_EncodeString(propertyValues[index]);
643 if ((property_key == NULL) || (property_value == NULL))
644 {
645 LogError("Failed URL Encoding properties");
646 result = MU_FAILURE;
647 }
648 else if (STRING_sprintf(topic_string, "%s=%s%s", STRING_c_str(property_key), STRING_c_str(property_value), propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0)
649 {
650 LogError("Failed constructing property string.");
651 result = MU_FAILURE;
652 }
653 STRING_delete(property_key);
654 STRING_delete(property_value);
655 }
656 else
657 {
658 if (STRING_sprintf(topic_string, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0)
659 {
660 LogError("Failed constructing property string.");
661 result = MU_FAILURE;
662 }
663 }
664 }
665 }
666 }
667 }
668 *index_ptr = index;
669 return result;
670}
671
672//
673// addSystemPropertyToTopicString appends a given "system" property from iothub_message_handle (set by the application with APIs such as IoTHubMessage_SetMessageId,
674// IoTHubMessage_SetContentTypeSystemProperty, etc.) onto the MQTT TOPIC topic_string.
675//
676static int addSystemPropertyToTopicString(STRING_HANDLE topic_string, size_t index, const char* property_key, const char* property_value, bool urlencode)
677{
678 int result = 0;
679
680 if (urlencode)
681 {
682 STRING_HANDLE encoded_property_value = URL_EncodeString(property_value);
683 if (encoded_property_value == NULL)
684 {
685 LogError("Failed URL encoding %s.", property_key);
686 result = MU_FAILURE;
687 }
688 else if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, STRING_c_str(encoded_property_value)) != 0)
689 {
690 LogError("Failed setting %s.", property_key);
691 result = MU_FAILURE;
692 }
693 STRING_delete(encoded_property_value);
694 }
695 else
696 {
697 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, property_value) != 0)
698 {
699 LogError("Failed setting %s.", property_key);
700 result = MU_FAILURE;
701 }
702 }
703 return result;
704}
705
706//
707// addSystemPropertyToTopicString appends all "system" property from iothub_message_handle (set by the application with APIs such as IoTHubMessage_SetMessageId,
708// IoTHubMessage_SetContentTypeSystemProperty, etc.) onto the MQTT TOPIC topic_string.
709//
710static int addSystemPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
711{
712 int result = 0;
713 size_t index = *index_ptr;
714
715 bool is_security_msg = IoTHubMessage_IsSecurityMessage(iothub_message_handle);
716 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_052: [ IoTHubTransport_MQTT_Common_DoWork shall check for the CorrelationId property and if found add the value as a system property in the format of $.cid=<id> ] */
717 const char* correlation_id = IoTHubMessage_GetCorrelationId(iothub_message_handle);
718 if (correlation_id != NULL)
719 {
720 result = addSystemPropertyToTopicString(topic_string, index, CORRELATION_ID_PROPERTY, correlation_id, urlencode);
721 index++;
722 }
723 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_053: [ IoTHubTransport_MQTT_Common_DoWork shall check for the MessageId property and if found add the value as a system property in the format of $.mid=<id> ] */
724 if (result == 0)
725 {
726 const char* msg_id = IoTHubMessage_GetMessageId(iothub_message_handle);
727 if (msg_id != NULL)
728 {
729 result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_ID_PROPERTY, msg_id, urlencode);
730 index++;
731 }
732 }
733 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_010: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentType property and if found add the `value` as a system property in the format of `$.ct=<value>` ]
734 if (result == 0)
735 {
736 const char* content_type = IoTHubMessage_GetContentTypeSystemProperty(iothub_message_handle);
737 if (content_type != NULL)
738 {
739 result = addSystemPropertyToTopicString(topic_string, index, CONTENT_TYPE_PROPERTY, content_type, urlencode);
740 index++;
741 }
742 }
743 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_011: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentEncoding property and if found add the `value` as a system property in the format of `$.ce=<value>` ]
744 if (result == 0)
745 {
746 const char* content_encoding = IoTHubMessage_GetContentEncodingSystemProperty(iothub_message_handle);
747 if (content_encoding != NULL)
748 {
749 // Security message require content encoding
750 result = addSystemPropertyToTopicString(topic_string, index, CONTENT_ENCODING_PROPERTY, content_encoding, is_security_msg ? true : urlencode);
751 index++;
752 }
753 }
754
755 if (result == 0)
756 {
757 const char* message_creation_time_utc = IoTHubMessage_GetMessageCreationTimeUtcSystemProperty(iothub_message_handle);
758 if (message_creation_time_utc != NULL)
759 {
760 result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_CREATION_TIME_UTC, message_creation_time_utc, urlencode);
761 index++;
762 }
763 }
764
765 if (result == 0)
766 {
767 if (is_security_msg)
768 {
769 // The Security interface Id value must be encoded
770 if (addSystemPropertyToTopicString(topic_string, index++, SECURITY_INTERFACE_ID_MQTT, SECURITY_INTERFACE_ID_VALUE, true) != 0)
771 {
772 LogError("Failed setting Security interface id");
773 result = MU_FAILURE;
774 }
775 else
776 {
777 result = 0;
778 }
779 }
780 }
781 *index_ptr = index;
782 return result;
783}
784
785//
786// addDiagnosticPropertiesTouMqttMessage appends diagnostic data (as specified by IoTHubMessage_SetDiagnosticPropertyData) onto
787// the MQTT topic topic_string.
788//
789static int addDiagnosticPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr)
790{
791 int result = 0;
792 size_t index = *index_ptr;
793
794 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_014: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the diagnostic properties including diagid and diagCreationTimeUtc and if found both add them as system property in the format of `$.diagid` and `$.diagctx` respectively]
795 const IOTHUB_MESSAGE_DIAGNOSTIC_PROPERTY_DATA* diagnosticData = IoTHubMessage_GetDiagnosticPropertyData(iothub_message_handle);
796 if (diagnosticData != NULL)
797 {
798 const char* diag_id = diagnosticData->diagnosticId;
799 const char* creation_time_utc = diagnosticData->diagnosticCreationTimeUtc;
800 //diagid and creationtimeutc must be present/unpresent simultaneously
801 if (diag_id != NULL && creation_time_utc != NULL)
802 {
803 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_ID_PROPERTY, diag_id) != 0)
804 {
805 LogError("Failed setting diagnostic id");
806 result = MU_FAILURE;
807 }
808 index++;
809
810 if (result == 0)
811 {
812 //construct diagnostic context, it should be urlencode(key1=value1,key2=value2)
813 STRING_HANDLE diagContextHandle = STRING_construct_sprintf("%s=%s", DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY, creation_time_utc);
814 if (diagContextHandle == NULL)
815 {
816 LogError("Failed constructing diagnostic context");
817 result = MU_FAILURE;
818 }
819 else
820 {
821 //Add other diagnostic context properties here if have more
822 STRING_HANDLE encodedContextValueHandle = URL_Encode(diagContextHandle);
823 const char* encodedContextValueString = NULL;
824 if (encodedContextValueHandle != NULL &&
825 (encodedContextValueString = STRING_c_str(encodedContextValueHandle)) != NULL)
826 {
827 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_CONTEXT_PROPERTY, encodedContextValueString) != 0)
828 {
829 LogError("Failed setting diagnostic context");
830 result = MU_FAILURE;
831 }
832 STRING_delete(encodedContextValueHandle);
833 encodedContextValueHandle = NULL;
834 }
835 else
836 {
837 LogError("Failed encoding diagnostic context value");
838 result = MU_FAILURE;
839 }
840 STRING_delete(diagContextHandle);
841 diagContextHandle = NULL;
842 index++;
843 }
844 }
845 }
846 else if (diag_id != NULL || creation_time_utc != NULL)
847 {
848 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_015: [ `IoTHubTransport_MQTT_Common_DoWork` shall check whether diagid and diagCreationTimeUtc be present simultaneously, treat as error if not]
849 LogError("diagid and diagcreationtimeutc must be present simultaneously.");
850 result = MU_FAILURE;
851 }
852 }
853 return result;
854}
855
856//
857// addPropertiesTouMqttMessage adds user, "system", and diagnostic messages onto MQTT topic string. Note that "system" properties is a
858// construct of the SDK and IoT Hub. The MQTT protocol itself does not assign any significance to system and user properties (as opposed to AMQP).
859// The IOTHUB_MESSAGE_HANDLE structure however does have well-known properties (e.g. IoTHubMessage_SetMessageId) that the SDK treats as system
860// properties where we can automatically fill in the key value for in the key=value list.
861//
862static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic, bool urlencode)
863{
864 size_t index = 0;
865 STRING_HANDLE result = STRING_construct(eventTopic);
866 if (result == NULL)
867 {
868 LogError("Failed to create event topic string handle");
869 }
870 else if (addUserPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0)
871 {
872 LogError("Failed adding Properties to uMQTT Message");
873 STRING_delete(result);
874 result = NULL;
875 }
876 else if (addSystemPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0)
877 {
878 LogError("Failed adding System Properties to uMQTT Message");
879 STRING_delete(result);
880 result = NULL;
881 }
882 else if (addDiagnosticPropertiesTouMqttMessage(iothub_message_handle, result, &index) != 0)
883 {
884 LogError("Failed adding Diagnostic Properties to uMQTT Message");
885 STRING_delete(result);
886 result = NULL;
887 }
888
889 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_060: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the OutputName property and if found add the value as a system property in the format of $.on=<value> ]
890 if (result != NULL)
891 {
892 const char* output_name = IoTHubMessage_GetOutputName(iothub_message_handle);
893 if (output_name != NULL)
894 {
895 if (STRING_sprintf(result, "%s%%24.on=%s/", index == 0 ? "" : PROPERTY_SEPARATOR, output_name) != 0)
896 {
897 LogError("Failed setting output name.");
898 STRING_delete(result);
899 result = NULL;
900 }
901 index++;
902 }
903 }
904
905 return result;
906}
907
908//
909// publishTelemetryMsg invokes the umqtt layer to send a PUBLISH message.
910//
911static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
912{
913 int result;
914 STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent), transport_data->auto_url_encode_decode);
915 if (msgTopic == NULL)
916 {
917 LogError("Failed adding properties to mqtt message");
918 result = MU_FAILURE;
919 }
920 else
921 {
922 MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create_in_place(mqttMsgEntry->packet_id, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
923 if (mqttMsg == NULL)
924 {
925 LogError("Failed creating mqtt message");
926 result = MU_FAILURE;
927 }
928 else
929 {
930 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
931 {
932 LogError("Failed retrieving tickcounter info");
933 result = MU_FAILURE;
934 }
935 else
936 {
937 if (mqtt_client_publish(transport_data->mqttClient, mqttMsg) != 0)
938 {
939 LogError("Failed attempting to publish mqtt message");
940 result = MU_FAILURE;
941 }
942 else
943 {
944 mqttMsgEntry->retryCount++;
945 result = 0;
946 }
947 }
948 mqttmessage_destroy(mqttMsg);
949 }
950 STRING_delete(msgTopic);
951 }
952 return result;
953}
954
955//
956// publishDeviceMethodResponseMsg invokes the umqtt to send a PUBLISH message that contains device method call results.
957//
958static int publishDeviceMethodResponseMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size)
959{
960 int result;
961 uint16_t packet_id = getNextPacketId(transport_data);
962
963 STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id));
964 if (msg_topic == NULL)
965 {
966 LogError("Failed constructing message topic.");
967 result = MU_FAILURE;
968 }
969 else
970 {
971 MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create_in_place(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, response, response_size);
972 if (mqtt_get_msg == NULL)
973 {
974 LogError("Failed constructing mqtt message.");
975 result = MU_FAILURE;
976 }
977 else
978 {
979 if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
980 {
981 LogError("Failed publishing to mqtt client.");
982 result = MU_FAILURE;
983 }
984 else
985 {
986 result = 0;
987 }
988 mqttmessage_destroy(mqtt_get_msg);
989 }
990 STRING_delete(msg_topic);
991 }
992 return result;
993}
994
995//
996// destroyDeviceTwinGetMsg frees msg_entry and any data associated with it.
997//
998static void destroyDeviceTwinGetMsg(MQTT_DEVICE_TWIN_ITEM* msg_entry)
999{
1000 free(msg_entry);
1001}
1002
1003//
1004// createDeviceTwinMsg allocates and fills in structure for MQTT_DEVICE_TWIN_ITEM.
1005//
1006static MQTT_DEVICE_TWIN_ITEM* createDeviceTwinMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id)
1007{
1008 MQTT_DEVICE_TWIN_ITEM* result;
1009 tickcounter_ms_t current_time;
1010
1011 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
1012 {
1013 LogError("Failed retrieving tickcounter info");
1014 result = NULL;
1015 }
1016 else if ((result = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM))) == NULL)
1017 {
1018 LogError("Failed allocating device twin data.");
1019 result = NULL;
1020 }
1021 else
1022 {
1023 memset(result, 0, sizeof(*result));
1024 result->msgCreationTime = current_time;
1025 result->packet_id = getNextPacketId(transport_data);
1026 result->iothub_msg_id = iothub_msg_id;
1027 result->device_twin_msg_type = device_twin_msg_type;
1028 }
1029
1030 return result;
1031}
1032
1033//
1034// publishDeviceTwinGetMsg invokes umqtt to PUBLISH a request to get the twin information.
1035//
1036static int publishDeviceTwinGetMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
1037{
1038 int result;
1039
1040 STRING_HANDLE msg_topic = STRING_construct_sprintf(GET_PROPERTIES_TOPIC, mqtt_info->packet_id);
1041 if (msg_topic == NULL)
1042 {
1043 LogError("Failed constructing get Prop topic.");
1044 result = MU_FAILURE;
1045 }
1046 else
1047 {
1048 MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(mqtt_info->packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, NULL, 0);
1049 if (mqtt_get_msg == NULL)
1050 {
1051 LogError("Failed constructing mqtt message.");
1052 result = MU_FAILURE;
1053 }
1054 else
1055 {
1056 if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
1057 {
1058 LogError("Failed publishing to mqtt client.");
1059 result = MU_FAILURE;
1060 }
1061 else
1062 {
1063 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
1064 result = 0;
1065 }
1066 mqttmessage_destroy(mqtt_get_msg);
1067 }
1068 STRING_delete(msg_topic);
1069 }
1070
1071 return result;
1072}
1073
1074//
1075// sendPendingGetTwinRequests will send any queued up GetTwin requests during a DoWork loop.
1076//
1077static void sendPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transportData)
1078{
1079 PDLIST_ENTRY dev_twin_item = transportData->pending_get_twin_queue.Flink;
1080
1081 while (dev_twin_item != &transportData->pending_get_twin_queue)
1082 {
1083 DLIST_ENTRY saveListEntry;
1084 saveListEntry.Flink = dev_twin_item->Flink;
1085 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
1086 (void)DList_RemoveEntryList(dev_twin_item);
1087
1088 if (publishDeviceTwinGetMsg(transportData, msg_entry) != 0)
1089 {
1090 LogError("Failed sending pending get twin request");
1091 destroyDeviceTwinGetMsg(msg_entry);
1092 }
1093 else
1094 {
1095 transportData->device_twin_get_sent = true;
1096 }
1097
1098 dev_twin_item = saveListEntry.Flink;
1099 }
1100}
1101
1102//
1103// removeExpiredTwinRequestsFromList removes any requests that have timed out.
1104//
1105static void removeExpiredTwinRequestsFromList(PMQTTTRANSPORT_HANDLE_DATA transport_data, tickcounter_ms_t current_ms, DLIST_ENTRY* twin_list)
1106{
1107 PDLIST_ENTRY list_item = twin_list->Flink;
1108
1109 while (list_item != twin_list)
1110 {
1111 DLIST_ENTRY next_list_item;
1112 next_list_item.Flink = list_item->Flink;
1113 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(list_item, MQTT_DEVICE_TWIN_ITEM, entry);
1114 bool item_timed_out = false;
1115
1116 if ((msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES) &&
1117 (((current_ms - msg_entry->msgCreationTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS))
1118 {
1119 item_timed_out = true;
1120 if (msg_entry->userCallback != NULL)
1121 {
1122 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, msg_entry->userContext);
1123 }
1124 }
1125 else if ((msg_entry->device_twin_msg_type == REPORTED_STATE) &&
1126 (((current_ms - msg_entry->msgCreationTime) / 1000) >= TWIN_REPORT_UPDATE_TIMEOUT_SECS))
1127 {
1128 item_timed_out = true;
1129 transport_data->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, STATUS_CODE_TIMEOUT_VALUE, transport_data->transport_ctx);
1130 }
1131
1132 if (item_timed_out)
1133 {
1134 (void)DList_RemoveEntryList(list_item);
1135 destroyDeviceTwinGetMsg(msg_entry);
1136 }
1137
1138 list_item = next_list_item.Flink;
1139 }
1140
1141}
1142
1143//
1144// removeExpiredTwinRequests removes any requests that have timed out, regardless of how the request invoked.
1145//
1146static void removeExpiredTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1147{
1148 tickcounter_ms_t current_ms;
1149
1150 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms) == 0)
1151 {
1152 removeExpiredTwinRequestsFromList(transport_data, current_ms, &transport_data->pending_get_twin_queue);
1153 removeExpiredTwinRequestsFromList(transport_data, current_ms, &transport_data->ack_waiting_queue);
1154 }
1155}
1156
1157//
1158// publishDeviceTwinMsg invokes umqtt to PUBLISH a request for the twin.
1159//
1160static int publishDeviceTwinMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
1161{
1162 int result;
1163
1164 STRING_HANDLE msgTopic = STRING_construct_sprintf(REPORTED_PROPERTIES_TOPIC, mqtt_info->packet_id);
1165 if (msgTopic == NULL)
1166 {
1167 LogError("Failed constructing reported prop topic.");
1168 result = MU_FAILURE;
1169 }
1170 else
1171 {
1172 const CONSTBUFFER* data_buff;
1173 if ((data_buff = CONSTBUFFER_GetContent(device_twin_info->report_data_handle)) == NULL)
1174 {
1175 LogError("Failed retrieving buffer content");
1176 result = MU_FAILURE;
1177 }
1178 else
1179 {
1180 MQTT_MESSAGE_HANDLE mqtt_rpt_msg = mqttmessage_create_in_place(mqtt_info->packet_id, STRING_c_str(msgTopic), DELIVER_AT_MOST_ONCE, data_buff->buffer, data_buff->size);
1181 if (mqtt_rpt_msg == NULL)
1182 {
1183 LogError("Failed creating mqtt message");
1184 result = MU_FAILURE;
1185 }
1186 else
1187 {
1188 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0)
1189 {
1190 LogError("Failed retrieving tickcounter info");
1191 result = MU_FAILURE;
1192 }
1193 else
1194 {
1195 if (mqtt_client_publish(transport_data->mqttClient, mqtt_rpt_msg) != 0)
1196 {
1197 LogError("Failed publishing mqtt message");
1198 result = MU_FAILURE;
1199 }
1200 else
1201 {
1202 mqtt_info->retryCount++;
1203 result = 0;
1204 }
1205 }
1206 mqttmessage_destroy(mqtt_rpt_msg);
1207 }
1208 }
1209 STRING_delete(msgTopic);
1210 }
1211 return result;
1212}
1213
1214//
1215// changeStateToSubscribeIfAllowed attempts to transition the state machine to subscribe, if our
1216// current state will allow it.
1217// This function does NOT immediately send the SUBSCRIBE however, instead setting things up
1218// so the next time DoWork is invoked then the SUBSCRIBE will happen.
1219//
1220static void changeStateToSubscribeIfAllowed(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1221{
1222 if (transport_data->currPacketState != CONNACK_TYPE &&
1223 transport_data->currPacketState != CONNECT_TYPE &&
1224 transport_data->currPacketState != DISCONNECT_TYPE &&
1225 transport_data->currPacketState != PACKET_TYPE_ERROR)
1226 {
1227 transport_data->currPacketState = SUBSCRIBE_TYPE;
1228 }
1229}
1230
1231//
1232// subscribeToNotifyStateIfNeeded sets up to subscribe to the server.
1233//
1234static int subscribeToNotifyStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1235{
1236 int result;
1237
1238 if (transport_data->topic_NotifyState == NULL)
1239 {
1240 transport_data->topic_NotifyState = STRING_construct(TOPIC_NOTIFICATION_STATE);
1241 if (transport_data->topic_NotifyState == NULL)
1242 {
1243 LogError("Failure: unable constructing notify state topic");
1244 result = MU_FAILURE;
1245 }
1246 else
1247 {
1248 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1249 result = 0;
1250 }
1251 }
1252 else
1253 {
1254 result = 0;
1255 }
1256
1257 if (result == 0)
1258 {
1259 changeStateToSubscribeIfAllowed(transport_data);
1260 }
1261
1262 return result;
1263}
1264
1265//
1266// isSystemProperty returns whether a given property name in an MQTT TOPIC published to this device/module
1267// is considered a "system". MQTT does not have a protocol defined concept of system properties. In this usage
1268// it implies that the IOTHUB_MESSAGE_HANDLE has an API for direct manipulation of the property (e.g. IoTHubMessage_GetMessageId).
1269//
1270static bool isSystemProperty(const char* tokenData)
1271{
1272 bool result = false;
1273 size_t propCount = sizeof(sysPropList) / sizeof(sysPropList[0]);
1274 size_t index = 0;
1275 size_t tokenDataLength = strlen(tokenData);
1276
1277 for (index = 0; index < propCount; index++)
1278 {
1279 if (tokenDataLength >= sysPropList[index].propLength &&
1280 memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0)
1281 {
1282 result = true;
1283 break;
1284 }
1285 }
1286 return result;
1287}
1288
1289//
1290// addInputNamePropertyToMsg translates the input name (embedded in the MQTT topic name) into the IoTHubMessage handle
1291// such that the application can call IoTHubMessage_GetInputName() to retrieve this. This is only currently used
1292// in IoT Edge module to module message communication, so that this module receiving the message can know which module invoked in.
1293//
1294static int addInputNamePropertyToMsg(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name)
1295{
1296 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ]
1297 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ]
1298
1299 int result = MU_FAILURE;
1300 int number_tokens_read = 0;
1301
1302 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(topic_name);
1303 if (token_handle == NULL)
1304 {
1305 LogError("STRING_TOKENIZER_create_from_char failed\n");
1306 result = MU_FAILURE;
1307 }
1308 else
1309 {
1310 STRING_HANDLE token_value;
1311 if ((token_value = STRING_new()) == NULL)
1312 {
1313 LogError("Failed allocating token_value");
1314 }
1315 else
1316 {
1317 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
1318 {
1319 number_tokens_read++;
1320 if (number_tokens_read == (slashes_to_reach_input_name + 1))
1321 {
1322 if ((IOTHUB_MESSAGE_OK != IoTHubMessage_SetInputName(IoTHubMessage, STRING_c_str(token_value))))
1323 {
1324 LogError("Failed adding input name to msg");
1325 result = MU_FAILURE;
1326 }
1327 else
1328 {
1329 result = 0;
1330 }
1331 break;
1332 }
1333 }
1334 }
1335 STRING_delete(token_value);
1336
1337 if (number_tokens_read != (slashes_to_reach_input_name + 1))
1338 {
1339 LogError("Not enough '/' to contain input name. Got %d, need at least %d", number_tokens_read, (slashes_to_reach_input_name + 1));
1340 result = MU_FAILURE;
1341 }
1342 STRING_TOKENIZER_destroy(token_handle);
1343 }
1344
1345 return result;
1346}
1347
1348//
1349// setMqttMessagePropertyIfPossible attempts to translate a "system" property into the IOTHUB_MESSAGE_HANDLE that will be provided to the
1350// application's callback.
1351//
1352static int setMqttMessagePropertyIfPossible(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* propName, const char* propValue, size_t nameLen)
1353{
1354 // Not finding a system property to map to isn't an error.
1355 int result = 0;
1356
1357 if (nameLen > 5)
1358 {
1359 if (strcmp((const char*)&propName[nameLen - 5], MESSAGE_CREATION_TIME_UTC) == 0)
1360 {
1361 if (IoTHubMessage_SetMessageCreationTimeUtcSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1362 {
1363 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'CreationTimeUtc' property.");
1364 result = MU_FAILURE;
1365 }
1366 return result;
1367 }
1368 }
1369
1370 if (nameLen > 4)
1371 {
1372 if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_DEVICE_ID) == 0)
1373 {
1374 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_063: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.cdid` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ConnectionDeviceId property ]
1375 if (IoTHubMessage_SetConnectionDeviceId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1376 {
1377 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
1378 result = MU_FAILURE;
1379 }
1380 return result;
1381 }
1382 if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_MODULE_ID_PROPERTY) == 0)
1383 {
1384 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_064: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.cmid` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ConnectionModuleId property ]
1385 if (IoTHubMessage_SetConnectionModuleId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1386 {
1387 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
1388 result = MU_FAILURE;
1389 }
1390 return result;
1391 }
1392 }
1393 if (nameLen > 3)
1394 {
1395 if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_ID_PROPERTY) == 0)
1396 {
1397 if (IoTHubMessage_SetMessageId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1398 {
1399 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
1400 result = MU_FAILURE;
1401 }
1402 return result;
1403 }
1404 else if (strcmp((const char*)&propName[nameLen - 3], CORRELATION_ID_PROPERTY) == 0)
1405 {
1406 if (IoTHubMessage_SetCorrelationId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1407 {
1408 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
1409 result = MU_FAILURE;
1410 }
1411 return result;
1412 }
1413 else if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_USER_ID) == 0)
1414 {
1415 if (IoTHubMessage_SetMessageUserIdSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1416 {
1417 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'userId' property.");
1418 result = MU_FAILURE;
1419 }
1420 return result;
1421 }
1422 }
1423
1424 if (nameLen > 2)
1425 {
1426 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_012: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.ct` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ContentType property ]
1427 if (strcmp((const char*)&propName[nameLen - 2], CONTENT_TYPE_PROPERTY) == 0)
1428 {
1429 if (IoTHubMessage_SetContentTypeSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1430 {
1431 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'customContentType' property.");
1432 result = MU_FAILURE;
1433 }
1434 return result;
1435 }
1436 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_013: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.ce` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ContentEncoding property ]
1437 else if (strcmp((const char*)&propName[nameLen - 2], CONTENT_ENCODING_PROPERTY) == 0)
1438 {
1439 if (IoTHubMessage_SetContentEncodingSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1440 {
1441 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'contentEncoding' property.");
1442 result = MU_FAILURE;
1443 }
1444 return result;
1445 }
1446 }
1447
1448 return result;
1449}
1450
1451//
1452// extractMqttProperties parses the MQTT topic PUBLISH'd to this device/module, retrieves properties and fills out the
1453// IOTHUB_MESSAGE_HANDLE which will ultimately be delivered to the application callback.
1454//
1455static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name, bool urldecode)
1456{
1457 int result;
1458 STRING_TOKENIZER_HANDLE token = STRING_TOKENIZER_create_from_char(topic_name);
1459 if (token != NULL)
1460 {
1461 MAP_HANDLE propertyMap = IoTHubMessage_Properties(IoTHubMessage);
1462 if (propertyMap == NULL)
1463 {
1464 LogError("Failure to retrieve IoTHubMessage_properties.");
1465 result = MU_FAILURE;
1466 }
1467 else
1468 {
1469 STRING_HANDLE output = STRING_new();
1470 if (output == NULL)
1471 {
1472 LogError("Failure to allocate STRING_new.");
1473 result = MU_FAILURE;
1474 }
1475 else
1476 {
1477 result = 0;
1478
1479 while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0)
1480 {
1481 const char* tokenData = STRING_c_str(output);
1482 size_t tokenLen = strlen(tokenData);
1483 if (tokenData == NULL || tokenLen == 0)
1484 {
1485 break;
1486 }
1487 else
1488 {
1489 if (isSystemProperty(tokenData))
1490 {
1491 const char* iterator = tokenData;
1492 while (iterator != NULL && *iterator != '\0' && result == 0)
1493 {
1494 if (*iterator == '=')
1495 {
1496 char* propName = NULL;
1497 char* propValue = NULL;
1498 size_t nameLen = iterator - tokenData;
1499 size_t valLen = tokenLen - (nameLen + 1) + 1;
1500
1501 if ((propName = malloc(nameLen + 1)) == NULL || (propValue = malloc(valLen + 1)) == NULL)
1502 {
1503 LogError("Failed allocating property name (%p) and/or value (%p)", propName, propValue);
1504 free(propName);
1505 result = MU_FAILURE;
1506 }
1507 else
1508 {
1509 memcpy(propName, tokenData, nameLen);
1510 propName[nameLen] = '\0';
1511
1512 memcpy(propValue, iterator + 1, valLen);
1513 propValue[valLen] = '\0';
1514
1515 if (urldecode)
1516 {
1517 STRING_HANDLE propValue_decoded;
1518 if ((propValue_decoded = URL_DecodeString(propValue)) == NULL)
1519 {
1520 LogError("Failed to URL decode property value");
1521 result = MU_FAILURE;
1522 }
1523 else if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, STRING_c_str(propValue_decoded), nameLen) != 0)
1524 {
1525 LogError("Unable to set message property");
1526 result = MU_FAILURE;
1527 }
1528 STRING_delete(propValue_decoded);
1529 }
1530 else
1531 {
1532 if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, propValue, nameLen) != 0)
1533 {
1534 LogError("Unable to set message property");
1535 result = MU_FAILURE;
1536 }
1537 }
1538 free(propName);
1539 free(propValue);
1540 }
1541 break;
1542 }
1543 iterator++;
1544 }
1545 }
1546 else //User Properties
1547 {
1548 const char* iterator = tokenData;
1549 while (iterator != NULL && *iterator != '\0' && result == 0)
1550 {
1551 if (*iterator == '=')
1552 {
1553 char* propName = NULL;
1554 char* propValue = NULL;
1555 size_t nameLen = iterator - tokenData;
1556 size_t valLen = tokenLen - (nameLen + 1) + 1;
1557
1558 if ((propName = (char*)malloc(nameLen + 1)) == NULL || (propValue = (char*)malloc(valLen + 1)) == NULL)
1559 {
1560 free(propName);
1561 LogError("Failed allocating property information");
1562 result = MU_FAILURE;
1563 }
1564 else
1565 {
1566 memcpy(propName, tokenData, nameLen);
1567 propName[nameLen] = '\0';
1568
1569 memcpy(propValue, iterator + 1, valLen);
1570 propValue[valLen] = '\0';
1571
1572 if (urldecode)
1573 {
1574 STRING_HANDLE propName_decoded = URL_DecodeString(propName);
1575 STRING_HANDLE propValue_decoded = URL_DecodeString(propValue);
1576 if (propName_decoded == NULL || propValue_decoded == NULL)
1577 {
1578 LogError("Failed to URL decode property");
1579 result = MU_FAILURE;
1580 }
1581 else if (Map_AddOrUpdate(propertyMap, STRING_c_str(propName_decoded), STRING_c_str(propValue_decoded)) != MAP_OK)
1582 {
1583 LogError("Map_AddOrUpdate failed.");
1584 result = MU_FAILURE;
1585 }
1586 STRING_delete(propName_decoded);
1587 STRING_delete(propValue_decoded);
1588 }
1589 else
1590 {
1591 if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK)
1592 {
1593 LogError("Map_AddOrUpdate failed.");
1594 result = MU_FAILURE;
1595 }
1596 }
1597 free(propName);
1598 free(propValue);
1599 }
1600 break;
1601 }
1602 iterator++;
1603 }
1604 }
1605 }
1606 }
1607 STRING_delete(output);
1608 }
1609 }
1610 STRING_TOKENIZER_destroy(token);
1611 }
1612 else
1613 {
1614 LogError("Unable to create Tokenizer object.");
1615 result = MU_FAILURE;
1616 }
1617 return result;
1618}
1619
1620//
1621// processTwinNotification processes device and module twin updates made by IoT Hub / IoT Edge.
1622//
1623static void processTwinNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp)
1624{
1625 size_t request_id;
1626 int status_code;
1627 bool notification_msg;
1628
1629 if (parseDeviceTwinTopicInfo(topic_resp, &notification_msg, &request_id, &status_code) != 0)
1630 {
1631 LogError("Failure: parsing device topic info");
1632 }
1633 else
1634 {
1635 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1636 if (notification_msg)
1637 {
1638 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx);
1639 }
1640 else
1641 {
1642 PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink;
1643 while (dev_twin_item != &transportData->ack_waiting_queue)
1644 {
1645 DLIST_ENTRY saveListEntry;
1646 saveListEntry.Flink = dev_twin_item->Flink;
1647 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
1648 if (request_id == msg_entry->packet_id)
1649 {
1650 (void)DList_RemoveEntryList(dev_twin_item);
1651 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES)
1652 {
1653 if (msg_entry->userCallback == NULL)
1654 {
1655 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqttNotificationCallback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */
1656 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx);
1657 // Only after receiving device twin request should we start listening for patches.
1658 (void)subscribeToNotifyStateIfNeeded(transportData);
1659 }
1660 else
1661 {
1662 // This is a on-demand get twin request.
1663 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext);
1664 }
1665 }
1666 else
1667 {
1668 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqttNotificationCallback shall call IoTHubClientCore_LL_ReportedStateComplete ] */
1669 transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx);
1670 // Only after receiving device twin request should we start listening for patches.
1671 (void)subscribeToNotifyStateIfNeeded(transportData);
1672 }
1673
1674 destroyDeviceTwinGetMsg(msg_entry);
1675 break;
1676 }
1677 dev_twin_item = saveListEntry.Flink;
1678 }
1679 }
1680 }
1681}
1682
1683//
1684// processDeviceMethodNotification processes a device and module method invocations made by IoT Hub / IoT Edge.
1685//
1686static void processDeviceMethodNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp)
1687{
1688 STRING_HANDLE method_name = STRING_new();
1689 if (method_name == NULL)
1690 {
1691 LogError("Failure: allocating method_name string value");
1692 }
1693 else
1694 {
1695 DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO));
1696 if (dev_method_info == NULL)
1697 {
1698 LogError("Failure: allocating DEVICE_METHOD_INFO object");
1699 }
1700 else
1701 {
1702 dev_method_info->request_id = STRING_new();
1703 if (dev_method_info->request_id == NULL)
1704 {
1705 LogError("Failure constructing request_id string");
1706 free(dev_method_info);
1707 }
1708 else if (retrievDeviceMethodRidInfo(topic_resp, method_name, dev_method_info->request_id) != 0)
1709 {
1710 LogError("Failure: retrieve device topic info");
1711 STRING_delete(dev_method_info->request_id);
1712 free(dev_method_info);
1713 }
1714 else
1715 {
1716 /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqttNotificationCallback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */
1717 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1718 if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0)
1719 {
1720 LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete");
1721 STRING_delete(dev_method_info->request_id);
1722 free(dev_method_info);
1723 }
1724 }
1725 }
1726 STRING_delete(method_name);
1727 }
1728}
1729
1730//
1731// processIncomingMessageNotification processes both C2D messages and messages sent from one IoT Edge module into this module
1732//
1733static void processIncomingMessageNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp, IOTHUB_IDENTITY_TYPE type)
1734{
1735 const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle);
1736 IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length);
1737 if (IoTHubMessage == NULL)
1738 {
1739 LogError("Failure: IotHub Message creation has failed.");
1740 }
1741 else if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMsg(IoTHubMessage, topic_resp) != 0))
1742 {
1743 LogError("failure adding input name to property.");
1744 }
1745 else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0)
1746 {
1747 LogError("failure extracting mqtt properties.");
1748 }
1749 else
1750 {
1751 MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO));
1752 if (messageData == NULL)
1753 {
1754 LogError("malloc failed");
1755 }
1756 else
1757 {
1758 messageData->messageHandle = IoTHubMessage;
1759 messageData->transportContext = NULL;
1760
1761 if (type == IOTHUB_TYPE_EVENT_QUEUE)
1762 {
1763 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqttNotificationCallback` shall call `IoTHubClient_LL_MessageCallback`. ]
1764 if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx))
1765 {
1766 LogError("IoTHubClientCore_LL_MessageCallbackreturned false");
1767
1768 IoTHubMessage_Destroy(IoTHubMessage);
1769 free(messageData);
1770 }
1771 }
1772 else
1773 {
1774 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqttNotificationCallback shall call IoTHubClientCore_LL_MessageCallback. ] */
1775 if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx))
1776 {
1777 LogError("IoTHubClientCore_LL_MessageCallback returned false");
1778 IoTHubMessage_Destroy(IoTHubMessage);
1779 free(messageData);
1780 }
1781 }
1782 }
1783 }
1784}
1785
1786//
1787// mqttNotificationCallback processes incoming PUBLISH messages sent from Hub (or IoT Edge) to this device.
1788// This function is invoked by umqtt. It determines what topic the PUBLISH was directed at (e.g. Device Twin, Method, etc.),
1789// performs further parsing based on topic, and translates this call up to "iothub_client" layer for ultimate delivery to application callback.
1790//
1791static void mqttNotificationCallback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx)
1792{
1793 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqttNotificationCallback shall do nothing. ] */
1794 if (msgHandle != NULL && callbackCtx != NULL)
1795 {
1796 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqttNotificationCallback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */
1797 const char* topic_resp = mqttmessage_getTopicName(msgHandle);
1798 if (topic_resp == NULL)
1799 {
1800 LogError("Failure: NULL topic name encountered");
1801 }
1802 else
1803 {
1804 PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1805
1806 IOTHUB_IDENTITY_TYPE type = retrieveTopicType(topic_resp, STRING_c_str(transportData->topic_InputQueue));
1807 if (type == IOTHUB_TYPE_DEVICE_TWIN)
1808 {
1809 processTwinNotification(transportData, msgHandle, topic_resp);
1810 }
1811 else if (type == IOTHUB_TYPE_DEVICE_METHODS)
1812 {
1813 processDeviceMethodNotification(transportData, msgHandle, topic_resp);
1814 }
1815 else
1816 {
1817 processIncomingMessageNotification(transportData, msgHandle, topic_resp, type);
1818 }
1819 }
1820 }
1821}
1822
1823//
1824// mqttOperationCompleteCallback is invoked by umqtt when an operation initiated by the device completes.
1825// Examples of device initiated operations include PUBLISH, CONNECT, and SUBSCRIBE.
1826//
1827static void mqttOperationCompleteCallback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx)
1828{
1829 (void)handle;
1830 if (callbackCtx != NULL)
1831 {
1832 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1833
1834 switch (actionResult)
1835 {
1836 case MQTT_CLIENT_ON_PUBLISH_ACK:
1837 case MQTT_CLIENT_ON_PUBLISH_COMP:
1838 {
1839 const PUBLISH_ACK* puback = (const PUBLISH_ACK*)msgInfo;
1840 if (puback != NULL)
1841 {
1842 PDLIST_ENTRY currentListEntry = transport_data->telemetry_waitingForAck.Flink;
1843 while (currentListEntry != &transport_data->telemetry_waitingForAck)
1844 {
1845 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentListEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
1846 DLIST_ENTRY saveListEntry;
1847 saveListEntry.Flink = currentListEntry->Flink;
1848
1849 if (puback->packetId == mqttMsgEntry->packet_id)
1850 {
1851 (void)DList_RemoveEntryList(currentListEntry); //First remove the item from Waiting for Ack List.
1852 notifyApplicationOfSendMessageComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK);
1853 free(mqttMsgEntry);
1854 }
1855 currentListEntry = saveListEntry.Flink;
1856 }
1857 }
1858 else
1859 {
1860 LogError("Failure: MQTT_CLIENT_ON_PUBLISH_ACK publish_ack structure NULL.");
1861 }
1862 break;
1863 }
1864 case MQTT_CLIENT_ON_CONNACK:
1865 {
1866 const CONNECT_ACK* connack = (const CONNECT_ACK*)msgInfo;
1867 if (connack != NULL)
1868 {
1869 if (connack->returnCode == CONNECTION_ACCEPTED)
1870 {
1871 // The connect packet has been acked
1872 transport_data->currPacketState = CONNACK_TYPE;
1873 transport_data->isRecoverableError = true;
1874 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTED;
1875
1876 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_008: [ Upon successful connection the retry control shall be reset using retry_control_reset() ]
1877 retry_control_reset(transport_data->retry_control_handle);
1878
1879 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK, transport_data->transport_ctx);
1880 }
1881 else
1882 {
1883 if (connack->returnCode == CONN_REFUSED_SERVER_UNAVAIL)
1884 {
1885 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_DEVICE_DISABLED, transport_data->transport_ctx);
1886 }
1887 else if (connack->returnCode == CONN_REFUSED_BAD_USERNAME_PASSWORD || connack->returnCode == CONN_REFUSED_ID_REJECTED)
1888 {
1889 transport_data->isRecoverableError = false;
1890 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
1891 }
1892 else if (connack->returnCode == CONN_REFUSED_NOT_AUTHORIZED)
1893 {
1894 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
1895 }
1896 else if (connack->returnCode == CONN_REFUSED_UNACCEPTABLE_VERSION)
1897 {
1898 transport_data->isRecoverableError = false;
1899 }
1900 LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieveMqttReturnCodes(connack->returnCode));
1901 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_PENDING_CLOSE;
1902 transport_data->currPacketState = PACKET_TYPE_ERROR;
1903 }
1904 }
1905 else
1906 {
1907 LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL.");
1908 }
1909 break;
1910 }
1911 case MQTT_CLIENT_ON_SUBSCRIBE_ACK:
1912 {
1913 const SUBSCRIBE_ACK* suback = (const SUBSCRIBE_ACK*)msgInfo;
1914 if (suback != NULL)
1915 {
1916 size_t index = 0;
1917 for (index = 0; index < suback->qosCount; index++)
1918 {
1919 if (suback->qosReturn[index] == DELIVER_FAILURE)
1920 {
1921 LogError("Subscribe delivery failure of subscribe %lu", (unsigned long)index);
1922 }
1923 }
1924 // The subscribed packet has been acked
1925 transport_data->currPacketState = SUBACK_TYPE;
1926
1927 // Is this a twin message
1928 if (suback->packetId == transport_data->twin_resp_packet_id)
1929 {
1930 transport_data->twin_resp_sub_recv = true;
1931 }
1932 }
1933 else
1934 {
1935 LogError("Failure: MQTT_CLIENT_ON_SUBSCRIBE_ACK SUBSCRIBE_ACK parameter is NULL.");
1936 }
1937 break;
1938 }
1939 case MQTT_CLIENT_ON_PUBLISH_RECV:
1940 case MQTT_CLIENT_ON_PUBLISH_REL:
1941 {
1942 // Currently not used
1943 break;
1944 }
1945 case MQTT_CLIENT_ON_DISCONNECT:
1946 {
1947 // Close the client so we can reconnect again
1948 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
1949 break;
1950 }
1951 case MQTT_CLIENT_ON_UNSUBSCRIBE_ACK:
1952 case MQTT_CLIENT_ON_PING_RESPONSE:
1953 default:
1954 {
1955 break;
1956 }
1957 }
1958 }
1959}
1960
1961// Prior to creating a new connection, if we have an existing xioTransport that has been connected before
1962// we need to clear it now or else cached settings (especially TLS when communicating with HTTP proxies)
1963// will break reconnection attempt.
1964static void ResetConnectionIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1965{
1966 if (transport_data->xioTransport != NULL && transport_data->conn_attempted)
1967 {
1968 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
1969 setSavedTlsOptions(transport_data, options);
1970 DestroyXioTransport(transport_data);
1971 }
1972}
1973
1974//
1975// processDisconnectCallback is a callback invoked by umqtt to signal that the disconnection has completed.
1976//
1977static void processDisconnectCallback(void* ctx)
1978{
1979 if (ctx != NULL)
1980 {
1981 int* disconn_recv = (int*)ctx;
1982 *disconn_recv = 1;
1983 }
1984}
1985
1986//
1987// DisconnectFromClient will tear down the existing MQTT connection, trying to gracefully send an MQTT DISCONNECT (with a timeout),
1988// destroy the underlying xio for network communication, and update the transport_data state machine appropriately.
1989//
1990//NOTE: After a call to DisconnectFromClient, determine if appropriate to also call
1991// transport_data->transport_callbacks.connection_status_cb().
1992static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1993{
1994 if (transport_data->currPacketState != DISCONNECT_TYPE)
1995 {
1996 if (!transport_data->isDestroyCalled)
1997 {
1998 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
1999 setSavedTlsOptions(transport_data, options);
2000 }
2001 // Ensure the disconnect message is sent
2002 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
2003 {
2004 transport_data->disconnect_recv_flag = 0;
2005 (void)mqtt_client_disconnect(transport_data->mqttClient, processDisconnectCallback, &transport_data->disconnect_recv_flag);
2006 size_t disconnect_ctr = 0;
2007 do
2008 {
2009 mqtt_client_dowork(transport_data->mqttClient);
2010 disconnect_ctr++;
2011 ThreadAPI_Sleep(50);
2012 } while ((disconnect_ctr < MAX_DISCONNECT_VALUE) && (transport_data->disconnect_recv_flag == 0));
2013 }
2014 DestroyXioTransport(transport_data);
2015
2016 transport_data->device_twin_get_sent = false;
2017 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
2018 transport_data->currPacketState = DISCONNECT_TYPE;
2019 }
2020}
2021
2022//
2023// processErrorCallback is invoked by umqtt when an error has occurred.
2024//
2025static void processErrorCallback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx)
2026{
2027 (void)handle;
2028 if (callbackCtx != NULL)
2029 {
2030 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
2031 switch (error)
2032 {
2033 case MQTT_CLIENT_CONNECTION_ERROR:
2034 {
2035 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK, transport_data->transport_ctx);
2036 break;
2037 }
2038 case MQTT_CLIENT_COMMUNICATION_ERROR:
2039 {
2040 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR, transport_data->transport_ctx);
2041 break;
2042 }
2043 case MQTT_CLIENT_NO_PING_RESPONSE:
2044 {
2045 LogError("Mqtt Ping Response was not encountered. Reconnecting device...");
2046 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_PING_RESPONSE, transport_data->transport_ctx);
2047 break;
2048 }
2049 case MQTT_CLIENT_PARSE_ERROR:
2050 case MQTT_CLIENT_MEMORY_ERROR:
2051 case MQTT_CLIENT_UNKNOWN_ERROR:
2052 default:
2053 {
2054 LogError("INTERNAL ERROR: unexpected error value received %s", MU_ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error));
2055 break;
2056 }
2057 }
2058 if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_PENDING_CLOSE)
2059 {
2060 // We have encountered an mqtt protocol error in an non-closing state
2061 // The best course of action is to execute a shutdown of the mqtt/tls/socket
2062 // layer and then attempt to reconnect
2063 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT;
2064 }
2065 transport_data->currPacketState = PACKET_TYPE_ERROR;
2066 transport_data->device_twin_get_sent = false;
2067 if (transport_data->topic_MqttMessage != NULL)
2068 {
2069 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
2070 }
2071 if (transport_data->topic_GetState != NULL)
2072 {
2073 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2074 }
2075 if (transport_data->topic_NotifyState != NULL)
2076 {
2077 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2078 }
2079 if (transport_data->topic_DeviceMethods != NULL)
2080 {
2081 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2082 }
2083 if (transport_data->topic_InputQueue != NULL)
2084 {
2085 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
2086 }
2087 }
2088 else
2089 {
2090 LogError("Failure: mqtt called back with null context.");
2091 }
2092}
2093
2094//
2095// SubscribeToMqttProtocol determines which topics we should SUBSCRIBE to, based on existing state, and then
2096// invokes the underlying umqtt layer to send the SUBSCRIBE across the network.
2097//
2098static void SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2099{
2100 if (transport_data->topics_ToSubscribe != UNSUBSCRIBE_FROM_TOPIC)
2101 {
2102 uint32_t topic_subscription = 0;
2103 size_t subscribe_count = 0;
2104 uint16_t packet_id = getNextPacketId(transport_data);
2105 SUBSCRIBE_PAYLOAD subscribe[SUBSCRIBE_TOPIC_COUNT];
2106 if ((transport_data->topic_MqttMessage != NULL) && (SUBSCRIBE_TELEMETRY_TOPIC & transport_data->topics_ToSubscribe))
2107 {
2108 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_MqttMessage);
2109 subscribe[subscribe_count].qosReturn = DELIVER_AT_LEAST_ONCE;
2110 topic_subscription |= SUBSCRIBE_TELEMETRY_TOPIC;
2111 subscribe_count++;
2112 }
2113 if ((transport_data->topic_GetState != NULL) && (SUBSCRIBE_GET_REPORTED_STATE_TOPIC & transport_data->topics_ToSubscribe))
2114 {
2115 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_GetState);
2116 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
2117 topic_subscription |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2118 subscribe_count++;
2119 transport_data->twin_resp_packet_id = packet_id;
2120 }
2121 if ((transport_data->topic_NotifyState != NULL) && (SUBSCRIBE_NOTIFICATION_STATE_TOPIC & transport_data->topics_ToSubscribe))
2122 {
2123 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_NotifyState);
2124 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
2125 topic_subscription |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2126 subscribe_count++;
2127 }
2128 if ((transport_data->topic_DeviceMethods != NULL) && (SUBSCRIBE_DEVICE_METHOD_TOPIC & transport_data->topics_ToSubscribe))
2129 {
2130 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_DeviceMethods);
2131 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
2132 topic_subscription |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2133 subscribe_count++;
2134 }
2135 if ((transport_data->topic_InputQueue != NULL) && (SUBSCRIBE_INPUT_QUEUE_TOPIC & transport_data->topics_ToSubscribe))
2136 {
2137 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_InputQueue);
2138 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
2139 topic_subscription |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
2140 subscribe_count++;
2141 }
2142
2143 if (subscribe_count != 0)
2144 {
2145 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall call mqtt_client_subscribe to subscribe to the Message Topic.] */
2146 if (mqtt_client_subscribe(transport_data->mqttClient, packet_id, subscribe, subscribe_count) != 0)
2147 {
2148 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
2149 LogError("Failure: mqtt_client_subscribe returned error.");
2150 }
2151 else
2152 {
2153 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_018: [On success IoTHubTransport_MQTT_Common_Subscribe shall return 0.] */
2154 transport_data->topics_ToSubscribe &= ~topic_subscription;
2155 transport_data->currPacketState = SUBSCRIBE_TYPE;
2156 }
2157 }
2158 else
2159 {
2160 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
2161 LogError("Failure: subscribe_topic is empty.");
2162 }
2163 transport_data->currPacketState = SUBSCRIBE_TYPE;
2164 }
2165 else
2166 {
2167 transport_data->currPacketState = PUBLISH_TYPE;
2168 }
2169}
2170
2171//
2172// RetrieveMessagePayload translates the payload set by the application in messageHandle into payload/length for sending across the network.
2173//
2174static bool RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, const unsigned char** payload, size_t* length)
2175{
2176 bool result;
2177 IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(messageHandle);
2178 if (contentType == IOTHUBMESSAGE_BYTEARRAY)
2179 {
2180 if (IoTHubMessage_GetByteArray(messageHandle, &(*payload), length) != IOTHUB_MESSAGE_OK)
2181 {
2182 LogError("Failure result from IoTHubMessage_GetByteArray");
2183 result = false;
2184 *length = 0;
2185 }
2186 else
2187 {
2188 result = true;
2189 }
2190 }
2191 else if (contentType == IOTHUBMESSAGE_STRING)
2192 {
2193 *payload = (const unsigned char*)IoTHubMessage_GetString(messageHandle);
2194 if (*payload == NULL)
2195 {
2196 LogError("Failure result from IoTHubMessage_GetString");
2197 result = false;
2198 *length = 0;
2199 }
2200 else
2201 {
2202 *length = strlen((const char*)*payload);
2203 result = true;
2204 }
2205 }
2206 else
2207 {
2208 result = false;
2209 *length = 0;
2210 }
2211 return result;
2212}
2213
2214//
2215// ProcessPendingTelemetryMessages examines each telemetry message the device/module has sent that hasn't yet been PUBACK'd.
2216// For each message, it might:
2217// * Ignore it, if its timeout has not yet been reached.
2218// * Attempt to retry PUBLISH the message, if has remaining retries left.
2219// * Stop attempting to send the message. This will result in tearing down the underlying MQTT/TCP connection because it indicates
2220// something is wrong.
2221//
2222static void ProcessPendingTelemetryMessages(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2223{
2224 PDLIST_ENTRY current_entry = transport_data->telemetry_waitingForAck.Flink;
2225 tickcounter_ms_t current_ms;
2226 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms);
2227 while (current_entry != &transport_data->telemetry_waitingForAck)
2228 {
2229 MQTT_MESSAGE_DETAILS_LIST* msg_detail_entry = containingRecord(current_entry, MQTT_MESSAGE_DETAILS_LIST, entry);
2230 DLIST_ENTRY nextListEntry;
2231 nextListEntry.Flink = current_entry->Flink;
2232
2233 if (((current_ms - msg_detail_entry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN)
2234 {
2235 if (msg_detail_entry->retryCount >= MAX_SEND_RECOUNT_LIMIT)
2236 {
2237 notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);
2238 (void)DList_RemoveEntryList(current_entry);
2239 free(msg_detail_entry);
2240
2241 DisconnectFromClient(transport_data);
2242 if (!transport_data->isRetryExpiredCallbackCalled) // Only call once
2243 {
2244 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
2245 transport_data->isRetryExpiredCallbackCalled = true;
2246 }
2247 }
2248 else
2249 {
2250 // Ensure that the packet state is PUBLISH_TYPE and then attempt to send the message
2251 // again
2252 if (transport_data->currPacketState == PUBLISH_TYPE)
2253 {
2254 size_t messageLength;
2255 const unsigned char* messagePayload = NULL;
2256 if (!RetrieveMessagePayload(msg_detail_entry->iotHubMessageEntry->messageHandle, &messagePayload, &messageLength))
2257 {
2258 (void)DList_RemoveEntryList(current_entry);
2259 notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2260 }
2261 else
2262 {
2263 if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)
2264 {
2265 (void)DList_RemoveEntryList(current_entry);
2266 notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2267 free(msg_detail_entry);
2268 }
2269 }
2270 }
2271 else
2272 {
2273 msg_detail_entry->retryCount++;
2274 msg_detail_entry->msgPublishTime = current_ms;
2275 }
2276 }
2277 }
2278 current_entry = nextListEntry.Flink;
2279 }
2280}
2281
2282//
2283// CreateTransportProviderIfNecessary will create the underlying xioTransport (which handles networking I/O) and
2284// set its options, assuming the xioTransport does not already exist.
2285//
2286static int CreateTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2287{
2288 int result;
2289
2290 if (transport_data->xioTransport == NULL)
2291 {
2292 // construct address
2293 const char* hostAddress = STRING_c_str(transport_data->hostAddress);
2294 MQTT_TRANSPORT_PROXY_OPTIONS mqtt_proxy_options;
2295
2296 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_011: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create`. ]*/
2297 mqtt_proxy_options.host_address = transport_data->http_proxy_hostname;
2298 mqtt_proxy_options.port = transport_data->http_proxy_port;
2299 mqtt_proxy_options.username = transport_data->http_proxy_username;
2300 mqtt_proxy_options.password = transport_data->http_proxy_password;
2301
2302 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_010: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create` to obtain the underlying IO handle. ]*/
2303 transport_data->xioTransport = transport_data->get_io_transport(hostAddress, (transport_data->http_proxy_hostname == NULL) ? NULL : &mqtt_proxy_options);
2304 if (transport_data->xioTransport == NULL)
2305 {
2306 LogError("Unable to create the lower level TLS layer.");
2307 result = MU_FAILURE;
2308 }
2309 else
2310 {
2311 if (transport_data->saved_tls_options != NULL)
2312 {
2313 if (OptionHandler_FeedOptions(transport_data->saved_tls_options, transport_data->xioTransport) != OPTIONHANDLER_OK)
2314 {
2315 LogError("Failed feeding existing options to new TLS instance.");
2316 result = MU_FAILURE;
2317 }
2318 else
2319 {
2320 // The tlsio has the options, so our copy can be deleted
2321 setSavedTlsOptions(transport_data, NULL);
2322 result = 0;
2323 }
2324 }
2325 else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_X509_ECC)
2326 {
2327 if (IoTHubClient_Auth_Set_xio_Certificate(transport_data->authorization_module, transport_data->xioTransport) != 0)
2328 {
2329 LogError("Unable to create the lower level TLS layer.");
2330 result = MU_FAILURE;
2331 }
2332 else
2333 {
2334 result = 0;
2335 }
2336 }
2337 else
2338 {
2339 result = 0;
2340 }
2341 }
2342 }
2343 else
2344 {
2345 result = 0;
2346 }
2347 return result;
2348}
2349
2350//
2351// buildClientId creates the MQTT ClientId of this device or module.
2352//
2353static STRING_HANDLE buildClientId(const char* device_id, const char* module_id)
2354{
2355 if (module_id == NULL)
2356 {
2357 return STRING_construct_sprintf("%s", device_id);
2358 }
2359 else
2360 {
2361 return STRING_construct_sprintf("%s/%s", device_id, module_id);
2362 }
2363}
2364
2365//
2366// buildConfigForUsernameStep2IfNeeded builds the MQTT username. IoT Hub uses the query string of the userName to optionally
2367// specify SDK information, product information optionally specified by the application, and optionally the PnP ModelId.
2368//
2369static int buildConfigForUsernameStep2IfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2370{
2371 int result;
2372
2373 if (!transport_data->isConnectUsernameSet)
2374 {
2375 STRING_HANDLE userName = NULL;
2376 STRING_HANDLE modelIdParameter = NULL;
2377 STRING_HANDLE urlEncodedModelId = NULL;
2378 const char* modelId = transport_data->transport_callbacks.get_model_id_cb(transport_data->transport_ctx);
2379 // TODO: The preview API version in SDK is only scoped to scenarios that require the modelId to be set.
2380 // https://github.com/Azure/azure-iot-sdk-c/issues/1547 tracks removing this once non-preview API versions support modelId.
2381 const char* apiVersion = IOTHUB_API_VERSION;
2382 const char* appSpecifiedProductInfo = transport_data->transport_callbacks.prod_info_cb(transport_data->transport_ctx);
2383 STRING_HANDLE productInfoEncoded = NULL;
2384
2385 if ((productInfoEncoded = URL_EncodeString((appSpecifiedProductInfo != NULL) ? appSpecifiedProductInfo : DEFAULT_IOTHUB_PRODUCT_IDENTIFIER)) == NULL)
2386 {
2387 LogError("Unable to UrlEncode productInfo");
2388 result = MU_FAILURE;
2389 }
2390 else if ((userName = STRING_construct_sprintf("%s?api-version=%s&DeviceClientType=%s", STRING_c_str(transport_data->configPassedThroughUsername), apiVersion, STRING_c_str(productInfoEncoded))) == NULL)
2391 {
2392 LogError("Failed constructing string");
2393 result = 0;
2394 }
2395 else if (modelId != NULL)
2396 {
2397 if ((urlEncodedModelId = URL_EncodeString(modelId)) == NULL)
2398 {
2399 LogError("Failed to URL encode the modelID string");
2400 result = MU_FAILURE;
2401 }
2402 else if ((modelIdParameter = STRING_construct_sprintf("&%s=%s", DT_MODEL_ID_TOKEN, STRING_c_str(urlEncodedModelId))) == NULL)
2403 {
2404 LogError("Cannot build modelID string");
2405 result = MU_FAILURE;
2406 }
2407 else if (STRING_concat_with_STRING(userName, modelIdParameter) != 0)
2408 {
2409 LogError("Failed to set modelID parameter in connect");
2410 result = MU_FAILURE;
2411 }
2412 else
2413 {
2414 result = 0;
2415 }
2416 }
2417 else
2418 {
2419 result = 0;
2420 }
2421
2422 if (result == 0)
2423 {
2424 STRING_delete(transport_data->configPassedThroughUsername);
2425 transport_data->configPassedThroughUsername = userName;
2426 userName = NULL;
2427 // setting connect string is only allowed once in the lifetime of the device client.
2428 transport_data->isConnectUsernameSet = true;
2429 }
2430
2431 STRING_delete(userName);
2432 STRING_delete(modelIdParameter);
2433 STRING_delete(urlEncodedModelId);
2434 STRING_delete(productInfoEncoded);
2435 }
2436 else
2437 {
2438 result = 0;
2439 }
2440
2441 return result;
2442}
2443
2444//
2445// SendMqttConnectMsg sends the MQTT CONNECT message across the network. This function may also
2446// perform security functionality for building up the required token (optionally invoking into DPS if configured to do so)
2447//
2448static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2449{
2450 int result;
2451
2452 char* sasToken = NULL;
2453 result = 0;
2454
2455 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
2456 if (cred_type == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY || cred_type == IOTHUB_CREDENTIAL_TYPE_DEVICE_AUTH)
2457 {
2458 sasToken = IoTHubClient_Auth_Get_SasToken(transport_data->authorization_module, STRING_c_str(transport_data->devicesAndModulesPath), 0, NULL);
2459 if (sasToken == NULL)
2460 {
2461 LogError("failure getting sas token from IoTHubClient_Auth_Get_SasToken.");
2462 result = MU_FAILURE;
2463 }
2464 }
2465 else if (cred_type == IOTHUB_CREDENTIAL_TYPE_SAS_TOKEN)
2466 {
2467 SAS_TOKEN_STATUS token_status = IoTHubClient_Auth_Is_SasToken_Valid(transport_data->authorization_module);
2468 if (token_status == SAS_TOKEN_STATUS_INVALID)
2469 {
2470 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
2471 result = MU_FAILURE;
2472 }
2473 else if (token_status == SAS_TOKEN_STATUS_FAILED)
2474 {
2475 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
2476 result = MU_FAILURE;
2477 }
2478 else
2479 {
2480 sasToken = IoTHubClient_Auth_Get_SasToken(transport_data->authorization_module, NULL, 0, NULL);
2481 if (sasToken == NULL)
2482 {
2483 LogError("failure getting sas Token.");
2484 result = MU_FAILURE;
2485 }
2486 }
2487 }
2488
2489 if (result == 0)
2490 {
2491 STRING_HANDLE clientId;
2492 if (buildConfigForUsernameStep2IfNeeded(transport_data) != 0)
2493 {
2494 LogError("Failed to add optional connect parameters.");
2495 result = MU_FAILURE;
2496 }
2497 else if ((clientId = buildClientId(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id))) == NULL)
2498 {
2499 LogError("Unable to allocate clientId");
2500 result = MU_FAILURE;
2501 }
2502 else
2503 {
2504 MQTT_CLIENT_OPTIONS options = { 0 };
2505 options.clientId = (char*)STRING_c_str(clientId);
2506 options.willMessage = NULL;
2507 options.username = (char*)STRING_c_str(transport_data->configPassedThroughUsername);
2508 if (sasToken != NULL)
2509 {
2510 options.password = sasToken;
2511 }
2512 options.keepAliveInterval = transport_data->keepAliveValue;
2513 options.useCleanSession = false;
2514 options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE;
2515
2516 if (CreateTransportProviderIfNecessary(transport_data) == 0)
2517 {
2518 transport_data->conn_attempted = true;
2519 if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0)
2520 {
2521 LogError("failure connecting to address %s.", STRING_c_str(transport_data->hostAddress));
2522 result = MU_FAILURE;
2523 }
2524 else
2525 {
2526 transport_data->currPacketState = CONNECT_TYPE;
2527 transport_data->isRetryExpiredCallbackCalled = false;
2528 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time);
2529 result = 0;
2530 }
2531 }
2532 else
2533 {
2534 result = MU_FAILURE;
2535 }
2536
2537 if (sasToken != NULL)
2538 {
2539 free(sasToken);
2540 }
2541 STRING_delete(clientId);
2542 }
2543 }
2544 return result;
2545}
2546
2547//
2548// UpdateMqttConnectionStateIfNeeded is used for updating MQTT's underlying connection status during a DoWork loop.
2549// Among this function's responsibilities:
2550// * Attempt to establish an MQTT connection if one has not been already.
2551// * Retries failed connection, if in the correct state.
2552// * Processes deferred disconnect requests
2553// * Checks timeouts, for instance on connection establishment time as well as SaS token lifetime (if SAS used)
2554static int UpdateMqttConnectionStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2555{
2556 int result = 0;
2557
2558 // Make sure we're not destroying the object
2559 if (!transport_data->isDestroyCalled)
2560 {
2561 RETRY_ACTION retry_action = RETRY_ACTION_RETRY_LATER;
2562
2563 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_007: [ IoTHubTransport_MQTT_Common_DoWork shall try to reconnect according to the current retry policy set ]
2564 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError)
2565 {
2566 // Note: in case retry_control_should_retry fails, the reconnection shall be attempted anyway (defaulting to policy IOTHUB_CLIENT_RETRY_IMMEDIATE).
2567 if (!transport_data->conn_attempted || retry_control_should_retry(transport_data->retry_control_handle, &retry_action) != 0 || retry_action == RETRY_ACTION_RETRY_NOW)
2568 {
2569 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
2570 {
2571 transport_data->connectFailCount++;
2572 result = MU_FAILURE;
2573 }
2574 else
2575 {
2576 ResetConnectionIfNecessary(transport_data);
2577
2578 if (SendMqttConnectMsg(transport_data) != 0)
2579 {
2580 transport_data->connectFailCount++;
2581 result = MU_FAILURE;
2582 }
2583 else
2584 {
2585 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTING;
2586 transport_data->connectFailCount = 0;
2587 result = 0;
2588 }
2589 }
2590 }
2591 else if (retry_action == RETRY_ACTION_STOP_RETRYING)
2592 {
2593 // Set callback if retry expired
2594 if (!transport_data->isRetryExpiredCallbackCalled)
2595 {
2596 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
2597 transport_data->isRetryExpiredCallbackCalled = true;
2598 }
2599 result = MU_FAILURE;
2600 }
2601 else if (retry_action == RETRY_ACTION_RETRY_LATER)
2602 {
2603 result = MU_FAILURE;
2604 }
2605 }
2606 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT)
2607 {
2608 // Need to disconnect from client
2609 DisconnectFromClient(transport_data);
2610 result = 0;
2611 }
2612 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_001: [ IoTHubTransport_MQTT_Common_DoWork shall trigger reconnection if the mqtt_client_connect does not complete within `keepalive` seconds]
2613 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTING)
2614 {
2615 tickcounter_ms_t current_time;
2616 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
2617 {
2618 LogError("failed verifying MQTT_CLIENT_STATUS_CONNECTING timeout");
2619 result = MU_FAILURE;
2620 }
2621 else if ((current_time - transport_data->mqtt_connect_time) / 1000 > transport_data->connect_timeout_in_sec)
2622 {
2623 LogError("mqtt_client timed out waiting for CONNACK");
2624 transport_data->currPacketState = PACKET_TYPE_ERROR;
2625 DisconnectFromClient(transport_data);
2626 result = MU_FAILURE;
2627 }
2628 }
2629 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
2630 {
2631 // We are connected and not being closed, so does SAS need to reconnect?
2632 tickcounter_ms_t current_time;
2633 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
2634 {
2635 transport_data->connectFailCount++;
2636 result = MU_FAILURE;
2637 }
2638 else
2639 {
2640 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
2641 // If the credential type is not an x509 certificate then we shall renew the Sas_Token
2642 if (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_X509_ECC)
2643 {
2644 uint64_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);
2645 if ((current_time - transport_data->mqtt_connect_time) / 1000 > (sas_token_expiry*SAS_REFRESH_MULTIPLIER))
2646 {
2647 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_058: [ If the sas token has timed out IoTHubTransport_MQTT_Common_DoWork shall disconnect from the mqtt client and destroy the transport information and wait for reconnect. ] */
2648 DisconnectFromClient(transport_data);
2649
2650 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
2651 transport_data->currPacketState = UNKNOWN_TYPE;
2652 if (transport_data->topic_MqttMessage != NULL)
2653 {
2654 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
2655 }
2656 if (transport_data->topic_GetState != NULL)
2657 {
2658 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2659 }
2660 if (transport_data->topic_NotifyState != NULL)
2661 {
2662 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2663 }
2664 if (transport_data->topic_DeviceMethods != NULL)
2665 {
2666 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2667 }
2668 if (transport_data->topic_InputQueue != NULL)
2669 {
2670 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
2671 }
2672 }
2673 }
2674 }
2675 }
2676 }
2677 return result;
2678}
2679
2680// At handle creation time, we don't have all the fields required for building up the user name (e.g. productID)
2681// We build up as much of the string as we can at this point because we do not store upperConfig after initialization.
2682// In buildConfigForUsernameStep2IfNeeded, only immediately before we do CONNECT itself, do we complete building up this string.
2683static STRING_HANDLE buildConfigForUsernameStep1(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
2684{
2685 if (moduleId == NULL)
2686 {
2687 return STRING_construct_sprintf("%s.%s/%s/", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId);
2688 }
2689 else
2690 {
2691 return STRING_construct_sprintf("%s.%s/%s/%s/", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, moduleId);
2692 }
2693}
2694
2695//
2696// buildMqttEventString creates the MQTT topic for this device (and optionally module) to PUBLISH telemetry to.
2697//
2698static STRING_HANDLE buildMqttEventString(const char* device_id, const char* module_id)
2699{
2700 if (module_id == NULL)
2701 {
2702 return STRING_construct_sprintf(TOPIC_DEVICE_DEVICE, device_id);
2703 }
2704 else
2705 {
2706 return STRING_construct_sprintf(TOPIC_DEVICE_DEVICE_MODULE, device_id, module_id);
2707 }
2708}
2709
2710//
2711// buildDevicesAndModulesPath builds the path used when generating a SaS token for this request.
2712//
2713static STRING_HANDLE buildDevicesAndModulesPath(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
2714{
2715 if (moduleId == NULL)
2716 {
2717 return STRING_construct_sprintf("%s.%s/devices/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId);
2718 }
2719 else
2720 {
2721 return STRING_construct_sprintf("%s.%s/devices/%s/modules/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, moduleId);
2722 }
2723}
2724
2725//
2726// buildTopicMqttMsg builds the MQTT topic that is used for C2D messages sent to a device or module-to-module messages for a module running in IoT Edge
2727//
2728static STRING_HANDLE buildTopicMqttMsg(const char* device_id, const char* module_id)
2729{
2730 if (module_id == NULL)
2731 {
2732 return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id);
2733 }
2734 else
2735 {
2736 return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id);
2737 }
2738}
2739
2740//
2741// checkModuleIdsEqual verifies that module Ids coming from different upper layers are equal.
2742//
2743static bool checkModuleIdsEqual(const char* transportModuleId, const char* deviceModuleId)
2744{
2745 if ((transportModuleId != NULL) && (deviceModuleId == NULL))
2746 {
2747 return false;
2748 }
2749 else if ((transportModuleId == NULL) && (deviceModuleId != NULL))
2750 {
2751 return false;
2752 }
2753 else if ((transportModuleId == NULL) && (deviceModuleId == NULL))
2754 {
2755 return true;
2756 }
2757 else
2758 {
2759 return (0 == strcmp(transportModuleId, deviceModuleId));
2760 }
2761}
2762
2763//
2764// InitializeTransportHandleData creates a MQTTTRANSPORT_HANDLE_DATA.
2765//
2766static PMQTTTRANSPORT_HANDLE_DATA InitializeTransportHandleData(const IOTHUB_CLIENT_CONFIG* upperConfig, PDLIST_ENTRY waitingToSend, IOTHUB_AUTHORIZATION_HANDLE auth_module, const char* moduleId)
2767{
2768 PMQTTTRANSPORT_HANDLE_DATA state = (PMQTTTRANSPORT_HANDLE_DATA)malloc(sizeof(MQTTTRANSPORT_HANDLE_DATA));
2769 if (state == NULL)
2770 {
2771 LogError("Could not create MQTT transport state. Memory allocation failed.");
2772 }
2773 else
2774 {
2775 memset(state, 0, sizeof(MQTTTRANSPORT_HANDLE_DATA));
2776 if ((state->msgTickCounter = tickcounter_create()) == NULL)
2777 {
2778 LogError("Invalid Argument: iotHubName is empty");
2779 freeTransportHandleData(state);
2780 state = NULL;
2781 }
2782 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_005: [ MQTT transport shall use EXPONENTIAL_WITH_BACK_OFF as default retry policy ]
2783 else if ((state->retry_control_handle = retry_control_create(DEFAULT_RETRY_POLICY, DEFAULT_RETRY_TIMEOUT_IN_SECONDS)) == NULL)
2784 {
2785 LogError("Failed creating default retry control");
2786 freeTransportHandleData(state);
2787 state = NULL;
2788 }
2789 else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL)
2790 {
2791 LogError("failure constructing device_id.");
2792 freeTransportHandleData(state);
2793 state = NULL;
2794 }
2795 else if ((moduleId != NULL) && ((state->module_id = STRING_construct(moduleId)) == NULL))
2796 {
2797 LogError("failure constructing module_id.");
2798 freeTransportHandleData(state);
2799 state = NULL;
2800 }
2801 else if ((state->devicesAndModulesPath = buildDevicesAndModulesPath(upperConfig, moduleId)) == NULL)
2802 {
2803 LogError("failure constructing devicesPath.");
2804 freeTransportHandleData(state);
2805 state = NULL;
2806 }
2807 else
2808 {
2809 if ((state->topic_MqttEvent = buildMqttEventString(upperConfig->deviceId, moduleId)) == NULL)
2810 {
2811 LogError("Could not create topic_MqttEvent for MQTT");
2812 freeTransportHandleData(state);
2813 state = NULL;
2814 }
2815 else
2816 {
2817 state->mqttClient = mqtt_client_init(mqttNotificationCallback, mqttOperationCompleteCallback, state, processErrorCallback, state);
2818 if (state->mqttClient == NULL)
2819 {
2820 LogError("failure initializing mqtt client.");
2821 freeTransportHandleData(state);
2822 state = NULL;
2823 }
2824 else
2825 {
2826 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_008: [If the upperConfig contains a valid protocolGatewayHostName value this shall be used for the hostname, otherwise the hostname shall be constructed using the iothubName and iothubSuffix.] */
2827 if (upperConfig->protocolGatewayHostName == NULL)
2828 {
2829 state->hostAddress = STRING_construct_sprintf("%s.%s", upperConfig->iotHubName, upperConfig->iotHubSuffix);
2830 }
2831 else
2832 {
2833 state->hostAddress = STRING_construct(upperConfig->protocolGatewayHostName);
2834 }
2835
2836 if (state->hostAddress == NULL)
2837 {
2838 LogError("failure constructing host address.");
2839 freeTransportHandleData(state);
2840 state = NULL;
2841 }
2842 else if ((state->configPassedThroughUsername = buildConfigForUsernameStep1(upperConfig, moduleId)) == NULL)
2843 {
2844 freeTransportHandleData(state);
2845 state = NULL;
2846 }
2847 else
2848 {
2849 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_010: [IoTHubTransport_MQTT_Common_Create shall allocate memory to save its internal state where all topics, hostname, device_id, device_key, sasTokenSr and client handle shall be saved.] */
2850 DList_InitializeListHead(&(state->telemetry_waitingForAck));
2851 DList_InitializeListHead(&(state->ack_waiting_queue));
2852 DList_InitializeListHead(&(state->pending_get_twin_queue));
2853 state->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
2854 state->isRecoverableError = true;
2855 state->packetId = 1;
2856 state->waitingToSend = waitingToSend;
2857 state->currPacketState = CONNECT_TYPE;
2858 state->keepAliveValue = DEFAULT_MQTT_KEEPALIVE;
2859 state->connect_timeout_in_sec = DEFAULT_CONNACK_TIMEOUT;
2860 state->topics_ToSubscribe = UNSUBSCRIBE_FROM_TOPIC;
2861 srand((unsigned int)get_time(NULL));
2862 state->authorization_module = auth_module;
2863
2864 state->isDestroyCalled = false;
2865 state->isRetryExpiredCallbackCalled = false;
2866 state->isRegistered = false;
2867 state->device_twin_get_sent = false;
2868 state->xioTransport = NULL;
2869 state->portNum = 0;
2870 state->connectFailCount = 0;
2871 state->connectTick = 0;
2872 state->topic_MqttMessage = NULL;
2873 state->topic_GetState = NULL;
2874 state->topic_NotifyState = NULL;
2875 state->topic_DeviceMethods = NULL;
2876 state->topic_InputQueue = NULL;
2877 state->log_trace = state->raw_trace = false;
2878 state->isConnectUsernameSet = false;
2879 state->auto_url_encode_decode = false;
2880 state->conn_attempted = false;
2881 }
2882 }
2883 }
2884 }
2885 }
2886 return state;
2887}
2888
2889//
2890// ProcessSubackDoWork processes state transitions responding to a SUBACK packet.
2891// This does NOT occur once we receive the SUBACK packet immediately; instead the work is
2892// deferred to the DoWork loop.
2893//
2894static void ProcessSubackDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2895{
2896 if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) &&
2897 !transport_data->device_twin_get_sent)
2898 {
2899 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */
2900 MQTT_DEVICE_TWIN_ITEM* mqtt_info;
2901
2902 if ((mqtt_info = createDeviceTwinMsg(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
2903 {
2904 LogError("Failure: could not create message for twin get command");
2905 }
2906 else if (publishDeviceTwinGetMsg(transport_data, mqtt_info) == 0)
2907 {
2908 transport_data->device_twin_get_sent = true;
2909 }
2910 else
2911 {
2912 LogError("Failure: sending device twin get command.");
2913 destroyDeviceTwinGetMsg(mqtt_info);
2914 }
2915 }
2916
2917 // Publish can be called now and in any event we need to transition out of this state.
2918 transport_data->currPacketState = PUBLISH_TYPE;
2919}
2920
2921//
2922// ProcessPublishStateDoWork traverses all messages waiting to be sent and attempts to PUBLISH them.
2923//
2924static void ProcessPublishStateDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2925{
2926 PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink;
2927 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
2928 while (currentListEntry != transport_data->waitingToSend)
2929 {
2930 IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry);
2931 DLIST_ENTRY savedFromCurrentListEntry;
2932 savedFromCurrentListEntry.Flink = currentListEntry->Flink;
2933
2934 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
2935 size_t messageLength;
2936 const unsigned char* messagePayload = NULL;
2937 if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength))
2938 {
2939 (void)(DList_RemoveEntryList(currentListEntry));
2940 notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2941 LogError("Failure result from IoTHubMessage_GetData");
2942 }
2943 else
2944 {
2945 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */
2946 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST));
2947 if (mqttMsgEntry == NULL)
2948 {
2949 LogError("Allocation Error: Failure allocating MQTT Message Detail List.");
2950 }
2951 else
2952 {
2953 mqttMsgEntry->retryCount = 0;
2954 mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
2955 mqttMsgEntry->packet_id = getNextPacketId(transport_data);
2956 if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0)
2957 {
2958 (void)(DList_RemoveEntryList(currentListEntry));
2959 notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2960 free(mqttMsgEntry);
2961 }
2962 else
2963 {
2964 // Remove the message from the waiting queue ...
2965 (void)(DList_RemoveEntryList(currentListEntry));
2966 // and add it to the ack queue
2967 DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry));
2968 }
2969 }
2970 }
2971 currentListEntry = savedFromCurrentListEntry.Flink;
2972 }
2973
2974 if (transport_data->twin_resp_sub_recv)
2975 {
2976 sendPendingGetTwinRequests(transport_data);
2977 }
2978}
2979
2980TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Create(const IOTHUBTRANSPORT_CONFIG* config, MQTT_GET_IO_TRANSPORT get_io_transport, TRANSPORT_CALLBACKS_INFO* cb_info, void* ctx)
2981{
2982 PMQTTTRANSPORT_HANDLE_DATA result;
2983 size_t deviceIdSize;
2984
2985 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_001: [If parameter config is NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2986 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ if get_io_transport is NULL then IoTHubTransport_MQTT_Common_Create shall return NULL. ] */
2987 if (config == NULL || get_io_transport == NULL || cb_info == NULL)
2988 {
2989 LogError("Invalid Argument config: %p, get_io_transport: %p, cb_info: %p", config, get_io_transport, cb_info);
2990 result = NULL;
2991 }
2992 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig, auth_module_handle or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2993 else if (config->auth_module_handle == NULL)
2994 {
2995 LogError("Invalid Argument: auth_module_handle is NULL)");
2996 result = NULL;
2997 }
2998 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2999 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3000 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_003: [If both deviceKey & deviceSasToken fields are NOT NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3001 else if (config->upperConfig == NULL ||
3002 config->upperConfig->protocol == NULL ||
3003 config->upperConfig->deviceId == NULL ||
3004 ((config->upperConfig->deviceKey != NULL) && (config->upperConfig->deviceSasToken != NULL)) ||
3005 config->upperConfig->iotHubName == NULL ||
3006 config->upperConfig->iotHubSuffix == NULL)
3007 {
3008 LogError("Invalid Argument: upperConfig structure contains an invalid parameter");
3009 result = NULL;
3010 }
3011 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig, auth_module_handle or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3012 else if (config->waitingToSend == NULL)
3013 {
3014 LogError("Invalid Argument: waitingToSend is NULL)");
3015 result = NULL;
3016 }
3017 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_006: [If the upperConfig's variables deviceId is an empty strings or length is greater then 128 then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3018 else if (((deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0))
3019 {
3020 LogError("Invalid Argument: DeviceId is of an invalid size");
3021 result = NULL;
3022 }
3023 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3024 else if ((config->upperConfig->deviceKey != NULL) && (strlen(config->upperConfig->deviceKey) == 0))
3025 {
3026 LogError("Invalid Argument: deviceKey is empty");
3027 result = NULL;
3028 }
3029 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3030 else if ((config->upperConfig->deviceSasToken != NULL) && (strlen(config->upperConfig->deviceSasToken) == 0))
3031 {
3032 LogError("Invalid Argument: deviceSasToken is empty");
3033 result = NULL;
3034 }
3035 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3036 else if (strlen(config->upperConfig->iotHubName) == 0)
3037 {
3038 LogError("Invalid Argument: iotHubName is empty");
3039 result = NULL;
3040 }
3041 else if (IoTHub_Transport_ValidateCallbacks(cb_info) != 0)
3042 {
3043 LogError("failure checking transport callbacks");
3044 result = NULL;
3045 }
3046 else
3047 {
3048 result = InitializeTransportHandleData(config->upperConfig, config->waitingToSend, config->auth_module_handle, config->moduleId);
3049 if (result != NULL)
3050 {
3051 result->get_io_transport = get_io_transport;
3052 result->http_proxy_hostname = NULL;
3053 result->http_proxy_username = NULL;
3054 result->http_proxy_password = NULL;
3055
3056 result->transport_ctx = ctx;
3057 memcpy(&result->transport_callbacks, cb_info, sizeof(TRANSPORT_CALLBACKS_INFO));
3058 }
3059 }
3060 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_009: [If any error is encountered then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
3061 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_011: [On Success IoTHubTransport_MQTT_Common_Create shall return a non-NULL value.] */
3062 return result;
3063}
3064
3065void IoTHubTransport_MQTT_Common_Destroy(TRANSPORT_LL_HANDLE handle)
3066{
3067 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_012: [IoTHubTransport_MQTT_Common_Destroy shall do nothing if parameter handle is NULL.] */
3068 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3069 if (transport_data != NULL)
3070 {
3071 transport_data->isDestroyCalled = true;
3072
3073 DisconnectFromClient(transport_data);
3074
3075 //Empty the Waiting for Ack Messages.
3076 while (!DList_IsListEmpty(&transport_data->telemetry_waitingForAck))
3077 {
3078 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->telemetry_waitingForAck);
3079 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
3080 notifyApplicationOfSendMessageComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY);
3081 free(mqttMsgEntry);
3082 }
3083 while (!DList_IsListEmpty(&transport_data->ack_waiting_queue))
3084 {
3085 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->ack_waiting_queue);
3086 MQTT_DEVICE_TWIN_ITEM* mqtt_device_twin = containingRecord(currentEntry, MQTT_DEVICE_TWIN_ITEM, entry);
3087
3088 if (mqtt_device_twin->userCallback == NULL)
3089 {
3090 transport_data->transport_callbacks.twin_rpt_state_complete_cb(mqtt_device_twin->iothub_msg_id, STATUS_CODE_TIMEOUT_VALUE, transport_data->transport_ctx);
3091 }
3092 else
3093 {
3094 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
3095 }
3096
3097 destroyDeviceTwinGetMsg(mqtt_device_twin);
3098 }
3099 while (!DList_IsListEmpty(&transport_data->pending_get_twin_queue))
3100 {
3101 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->pending_get_twin_queue);
3102
3103 MQTT_DEVICE_TWIN_ITEM* mqtt_device_twin = containingRecord(currentEntry, MQTT_DEVICE_TWIN_ITEM, entry);
3104
3105 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
3106
3107 destroyDeviceTwinGetMsg(mqtt_device_twin);
3108 }
3109
3110 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_014: [IoTHubTransport_MQTT_Common_Destroy shall free all the resources currently in use.] */
3111 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/
3112 freeTransportHandleData(transport_data);
3113 }
3114}
3115
3116int IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin(TRANSPORT_LL_HANDLE handle)
3117{
3118 int result;
3119 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3120 if (transport_data == NULL)
3121 {
3122 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_042: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
3123 LogError("Invalid handle parameter. NULL.");
3124 result = MU_FAILURE;
3125 }
3126 else
3127 {
3128 if (transport_data->topic_GetState == NULL)
3129 {
3130 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_044: [IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall construct the get state topic string and the notify state topic string.] */
3131 transport_data->topic_GetState = STRING_construct(TOPIC_GET_DESIRED_STATE);
3132 if (transport_data->topic_GetState == NULL)
3133 {
3134 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_046: [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return a non-zero value.] */
3135 LogError("Failure: unable constructing reported state topic");
3136 result = MU_FAILURE;
3137 }
3138 else
3139 {
3140 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_047: [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return 0.] */
3141 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
3142 result = 0;
3143 }
3144 }
3145 else
3146 {
3147 result = 0;
3148 }
3149 if (result == 0)
3150 {
3151 changeStateToSubscribeIfAllowed(transport_data);
3152 }
3153 }
3154 return result;
3155}
3156
3157void IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin(TRANSPORT_LL_HANDLE handle)
3158{
3159 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3160 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_048: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall do nothing.] */
3161 if (transport_data != NULL)
3162 {
3163 if (transport_data->topic_GetState != NULL)
3164 {
3165 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_049: [If subscribe_state is set to IOTHUB_DEVICE_TWIN_DESIRED_STATE then IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall unsubscribe from the topic_GetState to the mqtt client.] */
3166 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
3167 STRING_delete(transport_data->topic_GetState);
3168 transport_data->topic_GetState = NULL;
3169 }
3170 if (transport_data->topic_NotifyState != NULL)
3171 {
3172 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_050: [If subscribe_state is set to IOTHUB_DEVICE_TWIN_NOTIFICATION_STATE then IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall unsubscribe from the topic_NotifyState to the mqtt client.] */
3173 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
3174 STRING_delete(transport_data->topic_NotifyState);
3175 transport_data->topic_NotifyState = NULL;
3176 }
3177 }
3178 else
3179 {
3180 LogError("Invalid argument to unsubscribe (handle is NULL).");
3181 }
3182}
3183
3184IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetTwinAsync(IOTHUB_DEVICE_HANDLE handle, IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK completionCallback, void* callbackContext)
3185{
3186 IOTHUB_CLIENT_RESULT result;
3187
3188 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_001: [ If `handle` or `completionCallback` are `NULL` than `IoTHubTransport_MQTT_Common_GetTwinAsync` shall return IOTHUB_CLIENT_INVALID_ARG. ]
3189 if (handle == NULL || completionCallback == NULL)
3190 {
3191 LogError("Invalid argument (handle=%p, completionCallback=%p)", handle, completionCallback);
3192 result = IOTHUB_CLIENT_INVALID_ARG;
3193 }
3194 else
3195 {
3196 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3197 MQTT_DEVICE_TWIN_ITEM* mqtt_info;
3198
3199 if ((mqtt_info = createDeviceTwinMsg(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
3200 {
3201 LogError("Failed creating the device twin get request message");
3202 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
3203 result = IOTHUB_CLIENT_ERROR;
3204 }
3205 else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgCreationTime) != 0)
3206 {
3207 LogError("Failed setting the get twin request enqueue time");
3208 destroyDeviceTwinGetMsg(mqtt_info);
3209 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
3210 result = IOTHUB_CLIENT_ERROR;
3211 }
3212 else
3213 {
3214 mqtt_info->userCallback = completionCallback;
3215 mqtt_info->userContext = callbackContext;
3216
3217 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_002: [ The request shall be queued to be sent when the transport is connected, through DoWork ]
3218 DList_InsertTailList(&transport_data->pending_get_twin_queue, &mqtt_info->entry);
3219
3220 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_004: [ If no failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_OK ]
3221 result = IOTHUB_CLIENT_OK;
3222 }
3223 }
3224
3225 return result;
3226}
3227
3228int IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod(IOTHUB_DEVICE_HANDLE handle)
3229{
3230 int result;
3231 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3232
3233 if (transport_data == NULL)
3234 {
3235 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_001 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
3236 LogError("Invalid handle parameter. NULL.");
3237 result = MU_FAILURE;
3238 }
3239 else
3240 {
3241 if (transport_data->topic_DeviceMethods == NULL)
3242 {
3243 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_004 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall construct the DEVICE_METHOD topic string for subscribe.]*/
3244 transport_data->topic_DeviceMethods = STRING_construct(TOPIC_DEVICE_METHOD_SUBSCRIBE);
3245 if (transport_data->topic_DeviceMethods == NULL)
3246 {
3247 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_006 : [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
3248 LogError("Failure: unable constructing device method subscribe topic");
3249 result = MU_FAILURE;
3250 }
3251 else
3252 {
3253 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_003 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall set the signaling flag for DEVICE_METHOD topic for the receiver's topic list. ]*/
3254 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
3255 result = 0;
3256 }
3257 }
3258 else
3259 {
3260 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_002 : [If the MQTT transport has been previously subscribed to DEVICE_METHOD topic IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall do nothing and return 0.]*/
3261 result = 0;
3262 }
3263
3264 if (result == 0)
3265 {
3266 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_005 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall schedule the send of the subscription.]*/
3267 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_007 : [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return 0.]*/
3268 changeStateToSubscribeIfAllowed(transport_data);
3269 }
3270 }
3271 return result;
3272}
3273
3274void IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod(TRANSPORT_LL_HANDLE handle)
3275{
3276 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3277 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_008 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall do nothing and return.]*/
3278 if (transport_data != NULL)
3279 {
3280 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_009 : [If the MQTT transport has not been subscribed to DEVICE_METHOD topic IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall do nothing and return.]*/
3281 if (transport_data->topic_DeviceMethods != NULL)
3282 {
3283 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_010 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall construct the DEVICE_METHOD topic string for unsubscribe.]*/
3284 const char* unsubscribe[1];
3285 unsubscribe[0] = STRING_c_str(transport_data->topic_DeviceMethods);
3286
3287 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_011 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall send the unsubscribe.]*/
3288 if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0)
3289 {
3290 LogError("Failure calling mqtt_client_unsubscribe");
3291 }
3292
3293 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_012 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall removes the signaling flag for DEVICE_METHOD topic from the receiver's topic list. ]*/
3294 STRING_delete(transport_data->topic_DeviceMethods);
3295 transport_data->topic_DeviceMethods = NULL;
3296 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_DEVICE_METHOD_TOPIC;
3297 }
3298 }
3299 else
3300 {
3301 LogError("Invalid argument to unsubscribe (NULL).");
3302 }
3303}
3304
3305int IoTHubTransport_MQTT_Common_DeviceMethod_Response(TRANSPORT_LL_HANDLE handle, METHOD_HANDLE methodId, const unsigned char* response, size_t respSize, int status)
3306{
3307 int result;
3308 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3309 if (transport_data != NULL)
3310 {
3311 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_042: [ IoTHubTransport_MQTT_Common_DeviceMethod_Response shall publish an mqtt message for the device method response. ] */
3312 DEVICE_METHOD_INFO* dev_method_info = (DEVICE_METHOD_INFO*)methodId;
3313 if (dev_method_info == NULL)
3314 {
3315 LogError("Failure: DEVICE_METHOD_INFO was NULL");
3316 result = MU_FAILURE;
3317 }
3318 else
3319 {
3320 if (publishDeviceMethodResponseMsg(transport_data, status, dev_method_info->request_id, response, respSize) != 0)
3321 {
3322 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
3323 LogError("Failure: publishing device method response");
3324 result = MU_FAILURE;
3325 }
3326 else
3327 {
3328 result = 0;
3329 }
3330 STRING_delete(dev_method_info->request_id);
3331 free(dev_method_info);
3332 }
3333 }
3334 else
3335 {
3336 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
3337 result = MU_FAILURE;
3338 LogError("Failure: invalid TRANSPORT_LL_HANDLE parameter specified");
3339 }
3340 return result;
3341}
3342
3343int IoTHubTransport_MQTT_Common_Subscribe(TRANSPORT_LL_HANDLE handle)
3344{
3345 int result;
3346 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3347 if (transport_data == NULL)
3348 {
3349 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
3350 LogError("Invalid handle parameter. NULL.");
3351 result = MU_FAILURE;
3352 }
3353 else
3354 {
3355 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the Message Topic.] */
3356 transport_data->topic_MqttMessage = buildTopicMqttMsg(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
3357 if (transport_data->topic_MqttMessage == NULL)
3358 {
3359 LogError("Failure constructing Message Topic");
3360 result = MU_FAILURE;
3361 }
3362 else
3363 {
3364 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
3365 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_035: [If current packet state is not CONNACT, DISCONNECT_TYPE, or PACKET_TYPE_ERROR then IoTHubTransport_MQTT_Common_Subscribe shall set the packet state to SUBSCRIBE_TYPE.]*/
3366 changeStateToSubscribeIfAllowed(transport_data);
3367 result = 0;
3368 }
3369 }
3370 return result;
3371}
3372
3373void IoTHubTransport_MQTT_Common_Unsubscribe(TRANSPORT_LL_HANDLE handle)
3374{
3375 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3376 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_019: [If parameter handle is NULL then IoTHubTransport_MQTT_Common_Unsubscribe shall do nothing.] */
3377 if (transport_data != NULL)
3378 {
3379 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_020: [IoTHubTransport_MQTT_Common_Unsubscribe shall call mqtt_client_unsubscribe to unsubscribe the mqtt message topic.] */
3380 const char* unsubscribe[1];
3381 unsubscribe[0] = STRING_c_str(transport_data->topic_MqttMessage);
3382 if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0)
3383 {
3384 LogError("Failure calling mqtt_client_unsubscribe");
3385 }
3386 STRING_delete(transport_data->topic_MqttMessage);
3387 transport_data->topic_MqttMessage = NULL;
3388 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_TELEMETRY_TOPIC;
3389 }
3390 else
3391 {
3392 LogError("Invalid argument to unsubscribe (NULL).");
3393 }
3394}
3395
3396IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_MQTT_Common_ProcessItem(TRANSPORT_LL_HANDLE handle, IOTHUB_IDENTITY_TYPE item_type, IOTHUB_IDENTITY_INFO* iothub_item)
3397{
3398 IOTHUB_PROCESS_ITEM_RESULT result;
3399 /* Codes_SRS_IOTHUBCLIENT_LL_07_001: [ If handle or iothub_item are NULL then IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3400 if (handle == NULL || iothub_item == NULL)
3401 {
3402 LogError("Invalid handle parameter iothub_item=%p", iothub_item);
3403 result = IOTHUB_PROCESS_ERROR;
3404 }
3405 else
3406 {
3407 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3408
3409 if (transport_data->currPacketState == PUBLISH_TYPE)
3410 {
3411 // Ensure the reported property suback has been received
3412 if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv)
3413 {
3414 MQTT_DEVICE_TWIN_ITEM* mqtt_info = createDeviceTwinMsg(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id);
3415 if (mqtt_info == NULL)
3416 {
3417 /* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3418 result = IOTHUB_PROCESS_ERROR;
3419 }
3420 else
3421 {
3422 /*Codes_SRS_IOTHUBCLIENT_LL_07_003: [ IoTHubTransport_MQTT_Common_ProcessItem shall publish a message to the mqtt protocol with the message topic for the message type.]*/
3423 /* Codes_SRS_IOTHUBCLIENT_LL_07_005: [ If successful IoTHubTransport_MQTT_Common_ProcessItem shall add mqtt info structure acknowledgement queue. ] */
3424 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
3425
3426 if (publishDeviceTwinMsg(transport_data, iothub_item->device_twin, mqtt_info) != 0)
3427 {
3428 DList_RemoveEntryList(&mqtt_info->entry);
3429
3430 free(mqtt_info);
3431 /* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3432 result = IOTHUB_PROCESS_ERROR;
3433 }
3434 else
3435 {
3436 result = IOTHUB_PROCESS_OK;
3437 }
3438 }
3439 }
3440 else
3441 {
3442 /* Codes_SRS_IOTHUBCLIENT_LL_07_006: [ If the item_type is not a supported type IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_CONTINUE. ]*/
3443 result = IOTHUB_PROCESS_CONTINUE;
3444 }
3445 }
3446 else
3447 {
3448 /* Codes_SRS_IOTHUBCLIENT_LL_07_002: [ If the mqtt is not ready to publish messages IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_NOT_CONNECTED. ] */
3449 result = IOTHUB_PROCESS_NOT_CONNECTED;
3450 }
3451 }
3452 return result;
3453}
3454
3455/* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ IoTHubTransport_MQTT_Common_DoWork shall subscribe to the Notification and get_state Topics if they are defined. ] */
3456void IoTHubTransport_MQTT_Common_DoWork(TRANSPORT_LL_HANDLE handle)
3457{
3458 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_026: [IoTHubTransport_MQTT_Common_DoWork shall do nothing if parameter handle and/or iotHubClientHandle is NULL.] */
3459 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3460 if (transport_data != NULL)
3461 {
3462 if (UpdateMqttConnectionStateIfNeeded(transport_data) == 0)
3463 {
3464 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_PENDING_CLOSE)
3465 {
3466 mqtt_client_disconnect(transport_data->mqttClient, NULL, NULL);
3467 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
3468 }
3469 else if (transport_data->currPacketState == CONNACK_TYPE || transport_data->currPacketState == SUBSCRIBE_TYPE)
3470 {
3471 SubscribeToMqttProtocol(transport_data);
3472 }
3473 else if (transport_data->currPacketState == SUBACK_TYPE)
3474 {
3475 ProcessSubackDoWork(transport_data);
3476 }
3477 else if (transport_data->currPacketState == PUBLISH_TYPE)
3478 {
3479 ProcessPublishStateDoWork(transport_data);
3480 }
3481 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */
3482 mqtt_client_dowork(transport_data->mqttClient);
3483 }
3484
3485 // Check the ack messages timeouts
3486 ProcessPendingTelemetryMessages(transport_data);
3487 removeExpiredTwinRequests(transport_data);
3488 }
3489}
3490
3491int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds)
3492{
3493 int result;
3494
3495 if (handle == NULL)
3496 {
3497 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */
3498 LogError("Invalid handle parameter. NULL.");
3499 result = MU_FAILURE;
3500 }
3501 else
3502 {
3503 RETRY_CONTROL_HANDLE new_retry_control_handle;
3504
3505 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters]
3506 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ]
3507 if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL)
3508 {
3509 LogError("Failed creating new retry control handle");
3510 result = MU_FAILURE;
3511 }
3512 else
3513 {
3514 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3515 RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle;
3516
3517 transport_data->retry_control_handle = new_retry_control_handle;
3518 retry_control_destroy(previous_retry_control_handle);
3519
3520 /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/
3521 result = 0;
3522 }
3523 }
3524
3525 return result;
3526}
3527
3528
3529IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetSendStatus(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
3530{
3531 IOTHUB_CLIENT_RESULT result;
3532
3533 if (handle == NULL || iotHubClientStatus == NULL)
3534 {
3535 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */
3536 LogError("invalid argument.");
3537 result = IOTHUB_CLIENT_INVALID_ARG;
3538 }
3539 else
3540 {
3541 MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3542 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck)))
3543 {
3544 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */
3545 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
3546 }
3547 else
3548 {
3549 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */
3550 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
3551 }
3552 result = IOTHUB_CLIENT_OK;
3553 }
3554 return result;
3555}
3556
3557IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_SetOption(TRANSPORT_LL_HANDLE handle, const char* option, const void* value)
3558{
3559 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_021: [If any parameter is NULL then IoTHubTransport_MQTT_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
3560 IOTHUB_CLIENT_RESULT result;
3561 if (
3562 (handle == NULL) ||
3563 (option == NULL) ||
3564 (value == NULL)
3565 )
3566 {
3567 result = IOTHUB_CLIENT_INVALID_ARG;
3568 LogError("invalid parameter (NULL) passed to IoTHubTransport_MQTT_Common_SetOption.");
3569 }
3570 else
3571 {
3572 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3573
3574 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
3575
3576 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_031: [If the option parameter is set to "logtrace" then the value shall be a bool_ptr and the value will determine if the mqtt client log is on or off.] */
3577 if (strcmp(OPTION_LOG_TRACE, option) == 0)
3578 {
3579 transport_data->log_trace = *((bool*)value);
3580 mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace);
3581 result = IOTHUB_CLIENT_OK;
3582 }
3583 else if (strcmp("rawlogtrace", option) == 0)
3584 {
3585 transport_data->raw_trace = *((bool*)value);
3586 mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace);
3587 result = IOTHUB_CLIENT_OK;
3588 }
3589 else if (strcmp(OPTION_AUTO_URL_ENCODE_DECODE, option) == 0)
3590 {
3591 transport_data->auto_url_encode_decode = *((bool*)value);
3592 result = IOTHUB_CLIENT_OK;
3593 }
3594 else if (strcmp(OPTION_CONNECTION_TIMEOUT, option) == 0)
3595 {
3596 int* connection_time = (int*)value;
3597 if (*connection_time != transport_data->connect_timeout_in_sec)
3598 {
3599 transport_data->connect_timeout_in_sec = (uint16_t)(*connection_time);
3600 }
3601 result = IOTHUB_CLIENT_OK;
3602 }
3603 else if (strcmp(OPTION_KEEP_ALIVE, option) == 0)
3604 {
3605 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_036: [If the option parameter is set to "keepalive" then the value shall be a int_ptr and the value will determine the mqtt keepalive time that is set for pings.] */
3606 int* keepAliveOption = (int*)value;
3607 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_037 : [If the option parameter is set to supplied int_ptr keepalive is the same value as the existing keepalive then IoTHubTransport_MQTT_Common_SetOption shall do nothing.] */
3608 if (*keepAliveOption != transport_data->keepAliveValue)
3609 {
3610 transport_data->keepAliveValue = (uint16_t)(*keepAliveOption);
3611 if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_NOT_CONNECTED)
3612 {
3613 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */
3614 DisconnectFromClient(transport_data);
3615 }
3616 }
3617 result = IOTHUB_CLIENT_OK;
3618 }
3619 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
3620 else if ((strcmp(OPTION_X509_CERT, option) == 0) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_UNKNOWN))
3621 {
3622 LogError("x509certificate specified, but authentication method is not x509");
3623 result = IOTHUB_CLIENT_INVALID_ARG;
3624 }
3625 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_040: [If the option parameter is set to "x509privatekey" then the value shall be a const char of the RSA Private Key to be used for x509.] */
3626 else if ((strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_UNKNOWN))
3627 {
3628 LogError("x509privatekey specified, but authentication method is not x509");
3629 result = IOTHUB_CLIENT_INVALID_ARG;
3630 }
3631 else if (strcmp(OPTION_RETRY_INTERVAL_SEC, option) == 0)
3632 {
3633 if (retry_control_set_option(transport_data->retry_control_handle, RETRY_CONTROL_OPTION_INITIAL_WAIT_TIME_IN_SECS, value) != 0)
3634 {
3635 LogError("Failure setting retry interval option");
3636 result = IOTHUB_CLIENT_ERROR;
3637 }
3638 else
3639 {
3640 result = IOTHUB_CLIENT_OK;
3641 }
3642 }
3643 else if (strcmp(OPTION_RETRY_MAX_DELAY_SECS, option) == 0)
3644 {
3645 if (retry_control_set_option(transport_data->retry_control_handle, RETRY_CONTROL_OPTION_MAX_DELAY_IN_SECS, value) != 0)
3646 {
3647 LogError("Failure setting retry max delay option");
3648 result = IOTHUB_CLIENT_ERROR;
3649 }
3650 else
3651 {
3652 result = IOTHUB_CLIENT_OK;
3653 }
3654 }
3655 else if (strcmp(OPTION_HTTP_PROXY, option) == 0)
3656 {
3657 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/
3658 HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value;
3659
3660 if (transport_data->xioTransport != NULL)
3661 {
3662 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_007: [ If the underlying IO has already been created, then `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3663 LogError("Cannot set proxy option once the underlying IO is created");
3664 result = IOTHUB_CLIENT_ERROR;
3665 }
3666 else if (proxy_options->host_address == NULL)
3667 {
3668 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_003: [ If `host_address` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
3669 LogError("NULL host_address in proxy options");
3670 result = IOTHUB_CLIENT_INVALID_ARG;
3671 }
3672 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_006: [ If only one of `username` and `password` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
3673 else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) &&
3674 (proxy_options->username != proxy_options->password))
3675 {
3676 LogError("Only one of username and password for proxy settings was NULL");
3677 result = IOTHUB_CLIENT_INVALID_ARG;
3678 }
3679 else
3680 {
3681 char* copied_proxy_hostname = NULL;
3682 char* copied_proxy_username = NULL;
3683 char* copied_proxy_password = NULL;
3684
3685 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_002: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/
3686 transport_data->http_proxy_port = proxy_options->port;
3687 if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0)
3688 {
3689 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3690 LogError("Cannot copy HTTP proxy hostname");
3691 result = IOTHUB_CLIENT_ERROR;
3692 }
3693 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
3694 else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0))
3695 {
3696 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3697 free(copied_proxy_hostname);
3698 LogError("Cannot copy HTTP proxy username");
3699 result = IOTHUB_CLIENT_ERROR;
3700 }
3701 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
3702 else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0))
3703 {
3704 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3705 if (copied_proxy_username != NULL)
3706 {
3707 free(copied_proxy_username);
3708 }
3709 free(copied_proxy_hostname);
3710 LogError("Cannot copy HTTP proxy password");
3711 result = IOTHUB_CLIENT_ERROR;
3712 }
3713 else
3714 {
3715 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/
3716 freeProxyData(transport_data);
3717
3718 transport_data->http_proxy_hostname = copied_proxy_hostname;
3719 transport_data->http_proxy_username = copied_proxy_username;
3720 transport_data->http_proxy_password = copied_proxy_password;
3721
3722 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_008: [ If setting the `proxy_data` option suceeds, `IoTHubTransport_MQTT_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/
3723 result = IOTHUB_CLIENT_OK;
3724 }
3725 }
3726 }
3727 else
3728 {
3729 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
3730 if (((strcmp(OPTION_X509_CERT, option) == 0) || (strcmp(OPTION_X509_PRIVATE_KEY, option) == 0)) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509))
3731 {
3732 IoTHubClient_Auth_Set_x509_Type(transport_data->authorization_module, true);
3733 }
3734
3735 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */
3736 if (CreateTransportProviderIfNecessary(transport_data) == 0)
3737 {
3738 if (xio_setoption(transport_data->xioTransport, option, value) == 0)
3739 {
3740 result = IOTHUB_CLIENT_OK;
3741 }
3742 else
3743 {
3744 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_132: [IoTHubTransport_MQTT_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG xio_setoption fails] */
3745 result = IOTHUB_CLIENT_INVALID_ARG;
3746 }
3747 }
3748 else
3749 {
3750 result = IOTHUB_CLIENT_ERROR;
3751 }
3752 }
3753 }
3754 return result;
3755}
3756
3757TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Register(TRANSPORT_LL_HANDLE handle, const IOTHUB_DEVICE_CONFIG* device, PDLIST_ENTRY waitingToSend)
3758{
3759 TRANSPORT_LL_HANDLE result = NULL;
3760
3761 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
3762 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_002: [ IoTHubTransport_MQTT_Common_Register shall return NULL if device or waitingToSend are NULL.]
3763 if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL))
3764 {
3765 LogError("IoTHubTransport_MQTT_Common_Register: handle, device or waitingToSend is NULL.");
3766 result = NULL;
3767 }
3768 else
3769 {
3770 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3771
3772 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceId, or both deviceKey and deviceSasToken are NULL.]
3773 if (device->deviceId == NULL)
3774 {
3775 LogError("IoTHubTransport_MQTT_Common_Register: deviceId is NULL.");
3776 result = NULL;
3777 }
3778 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_002: [ IoTHubTransport_MQTT_Common_Register shall return NULL if both deviceKey and deviceSasToken are provided.]
3779 else if ((device->deviceKey != NULL) && (device->deviceSasToken != NULL))
3780 {
3781 LogError("IoTHubTransport_MQTT_Common_Register: Both deviceKey and deviceSasToken are defined. Only one can be used.");
3782 result = NULL;
3783 }
3784 else
3785 {
3786 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_003: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransport_MQTT_Common_Create.]
3787 if (strcmp(STRING_c_str(transport_data->device_id), device->deviceId) != 0)
3788 {
3789 LogError("IoTHubTransport_MQTT_Common_Register: deviceId does not match.");
3790 result = NULL;
3791 }
3792 else if (!checkModuleIdsEqual(STRING_c_str(transport_data->module_id), device->moduleId))
3793 {
3794 LogError("IoTHubTransport_MQTT_Common_Register: moduleId does not match.");
3795 result = NULL;
3796 }
3797 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_43_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceKey is NULL when credential type is IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY ]
3798 else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY &&
3799 ((device->deviceKey == NULL) || (strcmp(IoTHubClient_Auth_Get_DeviceKey(transport_data->authorization_module), device->deviceKey) != 0)))
3800 {
3801 LogError("IoTHubTransport_MQTT_Common_Register: deviceKey does not match.");
3802 result = NULL;
3803 }
3804 else
3805 {
3806 if (transport_data->isRegistered == true)
3807 {
3808 LogError("Transport already has device registered by id: [%s]", device->deviceId);
3809 result = NULL;
3810 }
3811 else
3812 {
3813 transport_data->isRegistered = true;
3814 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_004: [ IoTHubTransport_MQTT_Common_Register shall return the TRANSPORT_LL_HANDLE as the TRANSPORT_LL_HANDLE. ]
3815 result = (TRANSPORT_LL_HANDLE)handle;
3816 }
3817 }
3818 }
3819 }
3820
3821 return result;
3822}
3823
3824void IoTHubTransport_MQTT_Common_Unregister(TRANSPORT_LL_HANDLE deviceHandle)
3825{
3826 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ If deviceHandle is NULL `IoTHubTransport_MQTT_Common_Unregister` shall do nothing. ]
3827 if (deviceHandle != NULL)
3828 {
3829 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)deviceHandle;
3830
3831 transport_data->isRegistered = false;
3832 }
3833}
3834
3835STRING_HANDLE IoTHubTransport_MQTT_Common_GetHostname(TRANSPORT_LL_HANDLE handle)
3836{
3837 STRING_HANDLE result;
3838 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_02_001: [ If handle is NULL then IoTHubTransport_MQTT_Common_GetHostname shall fail and return NULL. ]*/
3839 if (handle == NULL)
3840 {
3841 result = NULL;
3842 }
3843 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_02_002: [ Otherwise IoTHubTransport_MQTT_Common_GetHostname shall return a non-NULL STRING_HANDLE containg the hostname. ]*/
3844 else if ((result = STRING_clone(((MQTTTRANSPORT_HANDLE_DATA*)(handle))->hostAddress)) == NULL)
3845 {
3846 LogError("Cannot provide the target host name (STRING_clone failed).");
3847 }
3848
3849 return result;
3850}
3851
3852IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_SendMessageDisposition(MESSAGE_CALLBACK_INFO* message_data, IOTHUBMESSAGE_DISPOSITION_RESULT disposition)
3853{
3854 (void)disposition;
3855
3856 IOTHUB_CLIENT_RESULT result;
3857 if (message_data)
3858 {
3859 if (message_data->messageHandle)
3860 {
3861 IoTHubMessage_Destroy(message_data->messageHandle);
3862 result = IOTHUB_CLIENT_OK;
3863 }
3864 else
3865 {
3866 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_10_002: [If any of the messageData fields are NULL, IoTHubTransport_MQTT_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR. ]*/
3867 LogError("message handle is NULL");
3868 result = IOTHUB_CLIENT_ERROR;
3869 }
3870 free(message_data);
3871 }
3872 else
3873 {
3874 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_10_001: [If messageData is NULL, IoTHubTransport_MQTT_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR. ]*/
3875 LogError("message_data is NULL");
3876 result = IOTHUB_CLIENT_ERROR;
3877 }
3878 return result;
3879}
3880
3881
3882int IoTHubTransport_MQTT_Common_Subscribe_InputQueue(TRANSPORT_LL_HANDLE handle)
3883{
3884 int result;
3885 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3886 if (transport_data == NULL)
3887 {
3888 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_066: [ If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return a non-zero value.]
3889 LogError("Invalid handle parameter. NULL.");
3890 result = MU_FAILURE;
3891 }
3892 else if (transport_data->module_id == NULL)
3893 {
3894 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_073: [ If module ID is not set on the transpont, IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall fail.]
3895 LogError("ModuleID must be specified for input queues. NULL.");
3896 result = MU_FAILURE;
3897 }
3898 else if ((transport_data->topic_InputQueue == NULL) &&
3899 (transport_data->topic_InputQueue = STRING_construct_sprintf(TOPIC_INPUT_QUEUE_NAME, STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id))) == NULL)
3900 {
3901 LogError("Failure constructing Message Topic");
3902 result = MU_FAILURE;
3903 }
3904 else
3905 {
3906 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_067: [ IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the input queue Message Topic.]
3907 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
3908 changeStateToSubscribeIfAllowed(transport_data);
3909 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_070: [ On success IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return 0.]
3910 result = 0;
3911 }
3912 return result;
3913}
3914
3915void IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue(TRANSPORT_LL_HANDLE handle)
3916{
3917 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3918 if ((transport_data != NULL) && (transport_data->topic_InputQueue != NULL))
3919 {
3920 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_072: [ IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall call mqtt_client_unsubscribe to unsubscribe the mqtt input queue message topic.]
3921 const char* unsubscribe[1];
3922 unsubscribe[0] = STRING_c_str(transport_data->topic_InputQueue);
3923 if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0)
3924 {
3925 LogError("Failure calling mqtt_client_unsubscribe");
3926 }
3927 STRING_delete(transport_data->topic_InputQueue);
3928 transport_data->topic_InputQueue = NULL;
3929 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_INPUT_QUEUE_TOPIC;
3930 }
3931 else
3932 {
3933 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_071: [ If parameter handle is NULL then IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall do nothing.]
3934 LogError("Invalid argument to unsubscribe input queue (NULL).");
3935 }
3936}
3937
3938int IoTHubTransport_MQTT_SetCallbackContext(TRANSPORT_LL_HANDLE handle, void* ctx)
3939{
3940 int result;
3941 if (handle == NULL)
3942 {
3943 LogError("Invalid parameter specified handle: %p", handle);
3944 result = MU_FAILURE;
3945 }
3946 else
3947 {
3948 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3949 transport_data->transport_ctx = ctx;
3950 result = 0;
3951 }
3952 return result;
3953}
3954
3955int IoTHubTransport_MQTT_GetSupportedPlatformInfo(TRANSPORT_LL_HANDLE handle, PLATFORM_INFO_OPTION* info)
3956{
3957 int result;
3958
3959 if (handle == NULL || info == NULL)
3960 {
3961 LogError("Invalid parameter specified (handle: %p, info: %p)", handle, info);
3962 result = MU_FAILURE;
3963 }
3964 else
3965 {
3966 *info = PLATFORM_INFO_OPTION_RETRIEVE_SQM;
3967 result = 0;
3968 }
3969
3970 return result;
3971}
Note: See TracBrowser for help on using the repository browser.