source: azure_iot_hub_riscv/trunk/azure_iot_sdk/iothub_client/src/iothubtransport_mqtt_common.c@ 453

Last change on this file since 453 was 453, checked in by coas-nagasima, 4 years ago

ファイルを追加

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 160.9 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <ctype.h>
6
7#include "azure_c_shared_utility/optimize_size.h"
8#include "azure_c_shared_utility/gballoc.h"
9#include "azure_c_shared_utility/xlogging.h"
10#include "azure_c_shared_utility/strings.h"
11#include "azure_c_shared_utility/doublylinkedlist.h"
12#include "azure_c_shared_utility/crt_abstractions.h"
13#include "azure_c_shared_utility/agenttime.h"
14#include "azure_c_shared_utility/threadapi.h"
15#include "azure_c_shared_utility/sastoken.h"
16#include "azure_c_shared_utility/tickcounter.h"
17#include "azure_c_shared_utility/tlsio.h"
18#include "azure_c_shared_utility/platform.h"
19#include "azure_c_shared_utility/string_tokenizer.h"
20#include "azure_c_shared_utility/shared_util_options.h"
21#include "azure_c_shared_utility/urlencode.h"
22
23#include "internal/iothub_client_private.h"
24#include "internal/iothub_client_retry_control.h"
25#include "internal/iothub_transport_ll_private.h"
26#include "internal/iothubtransport_mqtt_common.h"
27#include "internal/iothubtransport.h"
28#include "internal/iothub_internal_consts.h"
29
30#include "azure_umqtt_c/mqtt_client.h"
31
32#include "iothub_client_options.h"
33#include "iothub_client_version.h"
34
35#include <stdarg.h>
36#include <stdio.h>
37
38#include <limits.h>
39#include <inttypes.h>
40
41#define SAS_REFRESH_MULTIPLIER .8
42#define EPOCH_TIME_T_VALUE 0
43#define DEFAULT_MQTT_KEEPALIVE 4*60 // 4 min
44#define DEFAULT_CONNACK_TIMEOUT 30 // 30 seconds
45#define BUILD_CONFIG_USERNAME 24
46#define SAS_TOKEN_DEFAULT_LEN 10
47#define RESEND_TIMEOUT_VALUE_MIN 1*60
48#define MAX_SEND_RECOUNT_LIMIT 2
49#define DEFAULT_CONNECTION_INTERVAL 30
50#define FAILED_CONN_BACKOFF_VALUE 5
51#define STATUS_CODE_FAILURE_VALUE 500
52#define STATUS_CODE_TIMEOUT_VALUE 408
53
54#define DEFAULT_RETRY_POLICY IOTHUB_CLIENT_RETRY_EXPONENTIAL_BACKOFF_WITH_JITTER
55#define DEFAULT_RETRY_TIMEOUT_IN_SECONDS 0
56#define MAX_DISCONNECT_VALUE 50
57
58#define ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS 60
59#define TWIN_REPORT_UPDATE_TIMEOUT_SECS (60*5)
60
61static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin";
62static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods";
63
64static const char* TOPIC_GET_DESIRED_STATE = "$iothub/twin/res/#";
65static const char* TOPIC_NOTIFICATION_STATE = "$iothub/twin/PATCH/properties/desired/#";
66
67static const char* TOPIC_DEVICE_MSG = "devices/%s/messages/devicebound/#";
68static const char* TOPIC_DEVICE_MODULE_MSG = "devices/%s/modules/%s/messages/devicebound/#";
69static const char* TOPIC_DEVICE_DEVICE = "devices/%s/messages/events/";
70static const char* TOPIC_DEVICE_DEVICE_MODULE = "devices/%s/modules/%s/messages/events/";
71
72static const char* TOPIC_INPUT_QUEUE_NAME = "devices/%s/modules/%s/#";
73
74static const char* TOPIC_DEVICE_METHOD_SUBSCRIBE = "$iothub/methods/POST/#";
75
76static const char* PROPERTY_SEPARATOR = "&";
77static const char* REPORTED_PROPERTIES_TOPIC = "$iothub/twin/PATCH/properties/reported/?$rid=%"PRIu16;
78static const char* GET_PROPERTIES_TOPIC = "$iothub/twin/GET/?$rid=%"PRIu16;
79static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%s";
80
81static const char* REQUEST_ID_PROPERTY = "?$rid=";
82
83static const char* MESSAGE_ID_PROPERTY = "mid";
84static const char* CORRELATION_ID_PROPERTY = "cid";
85static const char* CONTENT_TYPE_PROPERTY = "ct";
86static const char* CONTENT_ENCODING_PROPERTY = "ce";
87static const char* DIAGNOSTIC_ID_PROPERTY = "diagid";
88static const char* DIAGNOSTIC_CONTEXT_PROPERTY = "diagctx";
89static const char* CONNECTION_DEVICE_ID = "cdid";
90static const char* CONNECTION_MODULE_ID_PROPERTY = "cmid";
91
92static const char* DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY = "creationtimeutc";
93
94static const char DT_MODEL_ID_TOKEN[] = "model-id";
95
96static const char DEFAULT_IOTHUB_PRODUCT_IDENTIFIER[] = CLIENT_DEVICE_TYPE_PREFIX "/" IOTHUB_SDK_VERSION;
97
98#define TOLOWER(c) (((c>='A') && (c<='Z'))?c-'A'+'a':c)
99
100#define UNSUBSCRIBE_FROM_TOPIC 0x0000
101#define SUBSCRIBE_GET_REPORTED_STATE_TOPIC 0x0001
102#define SUBSCRIBE_NOTIFICATION_STATE_TOPIC 0x0002
103#define SUBSCRIBE_TELEMETRY_TOPIC 0x0004
104#define SUBSCRIBE_DEVICE_METHOD_TOPIC 0x0008
105#define SUBSCRIBE_INPUT_QUEUE_TOPIC 0x0010
106#define SUBSCRIBE_TOPIC_COUNT 5
107
108MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(MQTT_CLIENT_EVENT_ERROR, MQTT_CLIENT_EVENT_ERROR_VALUES)
109
110typedef struct SYSTEM_PROPERTY_INFO_TAG
111{
112 const char* propName;
113 size_t propLength;
114} SYSTEM_PROPERTY_INFO;
115
116static SYSTEM_PROPERTY_INFO sysPropList[] = {
117 { "%24.exp", 7 },
118 { "%24.mid", 7 },
119 { "%24.uid", 7 },
120 { "%24.to", 6 },
121 { "%24.cid", 7 },
122 { "%24.ct", 6 },
123 { "%24.ce", 6 },
124 { "devices/", 8 },
125 { "iothub-operation", 16 },
126 { "iothub-ack", 10 },
127 { "%24.on", 6 },
128 { "%24.cdid", 8 },
129 { "%24.cmid", 8 }
130};
131
132static const int slashes_to_reach_input_name = 5;
133
134typedef enum DEVICE_TWIN_MSG_TYPE_TAG
135{
136 REPORTED_STATE,
137 RETRIEVE_PROPERTIES
138} DEVICE_TWIN_MSG_TYPE;
139
140typedef enum MQTT_TRANSPORT_CREDENTIAL_TYPE_TAG
141{
142 CREDENTIAL_NOT_BUILD,
143 X509,
144 SAS_TOKEN_FROM_USER,
145 DEVICE_KEY,
146} MQTT_TRANSPORT_CREDENTIAL_TYPE;
147
148typedef enum MQTT_CLIENT_STATUS_TAG
149{
150 MQTT_CLIENT_STATUS_NOT_CONNECTED,
151 MQTT_CLIENT_STATUS_CONNECTING,
152 MQTT_CLIENT_STATUS_CONNECTED,
153 MQTT_CLIENT_STATUS_PENDING_CLOSE,
154 MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT
155} MQTT_CLIENT_STATUS;
156
157typedef struct MQTTTRANSPORT_HANDLE_DATA_TAG
158{
159 // Topic control
160 STRING_HANDLE topic_MqttEvent;
161 STRING_HANDLE topic_MqttMessage;
162 STRING_HANDLE topic_GetState;
163 STRING_HANDLE topic_NotifyState;
164 STRING_HANDLE topic_InputQueue;
165
166 STRING_HANDLE topic_DeviceMethods;
167
168 uint32_t topics_ToSubscribe;
169
170 // Connection related constants
171 STRING_HANDLE hostAddress;
172 STRING_HANDLE device_id;
173 STRING_HANDLE module_id;
174 STRING_HANDLE devicesAndModulesPath;
175 int portNum;
176 // conn_attempted indicates whether a connection has *ever* been attempted on the lifetime
177 // of this handle. Even if a given xio transport is added/removed, this always stays true.
178 bool conn_attempted;
179
180 MQTT_GET_IO_TRANSPORT get_io_transport;
181
182 // The current mqtt iothub implementation requires that the hub name and the domain suffix be passed as the first of a series of segments
183 // passed through the username portion of the connection frame.
184 // The second segment will contain the device id. The two segments are delemited by a "/".
185 // The first segment can be a maximum 256 characters.
186 // The second segment can be a maximum 128 characters.
187 // With the / delimeter you have 384 chars (Plus a terminator of 0).
188 STRING_HANDLE configPassedThroughUsername;
189
190 // Protocol
191 MQTT_CLIENT_HANDLE mqttClient;
192 XIO_HANDLE xioTransport;
193
194 // Session - connection
195 uint16_t packetId;
196 uint16_t twin_resp_packet_id;
197
198 // Connection state control
199 bool isRegistered;
200 MQTT_CLIENT_STATUS mqttClientStatus;
201 bool isDestroyCalled;
202 bool isRetryExpiredCallbackSet;
203 bool device_twin_get_sent;
204 bool twin_resp_sub_recv;
205 bool isRecoverableError;
206 uint16_t keepAliveValue;
207 uint16_t connect_timeout_in_sec;
208 tickcounter_ms_t mqtt_connect_time;
209 size_t connectFailCount;
210 tickcounter_ms_t connectTick;
211 bool log_trace;
212 bool raw_trace;
213 TICK_COUNTER_HANDLE msgTickCounter;
214 OPTIONHANDLER_HANDLE saved_tls_options; // Here are the options from the xio layer if any is saved.
215
216 // Internal lists for message tracking
217 PDLIST_ENTRY waitingToSend;
218 DLIST_ENTRY ack_waiting_queue;
219
220 DLIST_ENTRY pending_get_twin_queue;
221
222 // Message tracking
223 CONTROL_PACKET_TYPE currPacketState;
224
225 // Telemetry specific
226 DLIST_ENTRY telemetry_waitingForAck;
227 bool auto_url_encode_decode;
228
229 // Controls frequency of reconnection logic.
230 RETRY_CONTROL_HANDLE retry_control_handle;
231
232 // Auth module used to generating handle authorization
233 // with either SAS Token, x509 Certs, and Device SAS Token
234 IOTHUB_AUTHORIZATION_HANDLE authorization_module;
235
236 TRANSPORT_CALLBACKS_INFO transport_callbacks;
237 void* transport_ctx;
238
239 char* http_proxy_hostname;
240 int http_proxy_port;
241 char* http_proxy_username;
242 char* http_proxy_password;
243 bool isConnectUsernameSet;
244 int disconnect_recv_flag;
245} MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA;
246
247typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
248{
249 tickcounter_ms_t msgCreationTime;
250 tickcounter_ms_t msgPublishTime;
251 size_t retryCount;
252 uint16_t packet_id;
253 uint32_t iothub_msg_id;
254 IOTHUB_DEVICE_TWIN* device_twin_data;
255 DEVICE_TWIN_MSG_TYPE device_twin_msg_type;
256 DLIST_ENTRY entry;
257 IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK userCallback;
258 void* userContext;
259} MQTT_DEVICE_TWIN_ITEM;
260
261typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG
262{
263 tickcounter_ms_t msgPublishTime;
264 size_t retryCount;
265 IOTHUB_MESSAGE_LIST* iotHubMessageEntry;
266 void* context;
267 uint16_t packet_id;
268 DLIST_ENTRY entry;
269} MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST;
270
271typedef struct DEVICE_METHOD_INFO_TAG
272{
273 STRING_HANDLE request_id;
274} DEVICE_METHOD_INFO;
275
276static void free_proxy_data(MQTTTRANSPORT_HANDLE_DATA* mqtt_transport_instance)
277{
278 if (mqtt_transport_instance->http_proxy_hostname != NULL)
279 {
280 free(mqtt_transport_instance->http_proxy_hostname);
281 mqtt_transport_instance->http_proxy_hostname = NULL;
282 }
283
284 if (mqtt_transport_instance->http_proxy_username != NULL)
285 {
286 free(mqtt_transport_instance->http_proxy_username);
287 mqtt_transport_instance->http_proxy_username = NULL;
288 }
289
290 if (mqtt_transport_instance->http_proxy_password != NULL)
291 {
292 free(mqtt_transport_instance->http_proxy_password);
293 mqtt_transport_instance->http_proxy_password = NULL;
294 }
295}
296
297// Destroys xio transport associated with MQTT handle and resets appropriate state
298static void DestroyXioTransport(PMQTTTRANSPORT_HANDLE_DATA transport_data)
299{
300 xio_destroy(transport_data->xioTransport);
301 transport_data->xioTransport = NULL;
302}
303
304static void set_saved_tls_options(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options)
305{
306 if (transport->saved_tls_options != NULL)
307 {
308 OptionHandler_Destroy(transport->saved_tls_options);
309 }
310 transport->saved_tls_options = new_options;
311}
312
313static void free_transport_handle_data(MQTTTRANSPORT_HANDLE_DATA* transport_data)
314{
315 if (transport_data->mqttClient != NULL)
316 {
317 mqtt_client_deinit(transport_data->mqttClient);
318 }
319
320 if (transport_data->retry_control_handle != NULL)
321 {
322 retry_control_destroy(transport_data->retry_control_handle);
323 }
324
325 set_saved_tls_options(transport_data, NULL);
326
327 tickcounter_destroy(transport_data->msgTickCounter);
328
329 free_proxy_data(transport_data);
330
331 STRING_delete(transport_data->devicesAndModulesPath);
332 STRING_delete(transport_data->topic_MqttEvent);
333 STRING_delete(transport_data->topic_MqttMessage);
334 STRING_delete(transport_data->device_id);
335 STRING_delete(transport_data->module_id);
336 STRING_delete(transport_data->hostAddress);
337 STRING_delete(transport_data->configPassedThroughUsername);
338 STRING_delete(transport_data->topic_GetState);
339 STRING_delete(transport_data->topic_NotifyState);
340 STRING_delete(transport_data->topic_DeviceMethods);
341 STRING_delete(transport_data->topic_InputQueue);
342
343 DestroyXioTransport(transport_data);
344
345 free(transport_data);
346}
347
348int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds)
349{
350 int result;
351
352 if (handle == NULL)
353 {
354 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */
355 LogError("Invalid handle parameter. NULL.");
356 result = MU_FAILURE;
357 }
358 else
359 {
360 RETRY_CONTROL_HANDLE new_retry_control_handle;
361
362 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters]
363 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ]
364 if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL)
365 {
366 LogError("Failed creating new retry control handle");
367 result = MU_FAILURE;
368 }
369 else
370 {
371 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
372 RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle;
373
374 transport_data->retry_control_handle = new_retry_control_handle;
375 retry_control_destroy(previous_retry_control_handle);
376
377 /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/
378 result = 0;
379 }
380 }
381
382 return result;
383}
384
385static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transport_data)
386{
387 if (transport_data->packetId + 1 >= USHRT_MAX)
388 {
389 transport_data->packetId = 1;
390 }
391 else
392 {
393 transport_data->packetId++;
394 }
395 return transport_data->packetId;
396}
397
398#ifndef NO_LOGGING
399static const char* retrieve_mqtt_return_codes(CONNECT_RETURN_CODE rtn_code)
400{
401 switch (rtn_code)
402 {
403 case CONNECTION_ACCEPTED:
404 return "Accepted";
405 case CONN_REFUSED_UNACCEPTABLE_VERSION:
406 return "Unacceptable Version";
407 case CONN_REFUSED_ID_REJECTED:
408 return "Id Rejected";
409 case CONN_REFUSED_SERVER_UNAVAIL:
410 return "Server Unavailable";
411 case CONN_REFUSED_BAD_USERNAME_PASSWORD:
412 return "Bad Username/Password";
413 case CONN_REFUSED_NOT_AUTHORIZED:
414 return "Not Authorized";
415 case CONN_REFUSED_UNKNOWN:
416 default:
417 return "Unknown";
418 }
419}
420#endif // NO_LOGGING
421
422static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
423{
424 int result;
425
426 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
427 if (token_handle == NULL)
428 {
429 LogError("Failed creating token from device twin topic.");
430 result = MU_FAILURE;
431 }
432 else
433 {
434 STRING_HANDLE token_value;
435 if ((token_value = STRING_new()) == NULL)
436 {
437 LogError("Failed allocating new string .");
438 result = MU_FAILURE;
439 }
440 else
441 {
442 size_t token_index = 0;
443 size_t request_id_length = strlen(REQUEST_ID_PROPERTY);
444 result = MU_FAILURE;
445 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
446 {
447 if (token_index == 3)
448 {
449 if (STRING_concat_with_STRING(method_name, token_value) != 0)
450 {
451 LogError("Failed STRING_concat_with_STRING.");
452 result = MU_FAILURE;
453 break;
454 }
455 }
456 else if (token_index == 4)
457 {
458 if (STRING_length(token_value) >= request_id_length)
459 {
460 const char* request_id_value = STRING_c_str(token_value);
461 if (memcmp(request_id_value, REQUEST_ID_PROPERTY, request_id_length) == 0)
462 {
463 if (STRING_concat(request_id, request_id_value + request_id_length) != 0)
464 {
465 LogError("Failed STRING_concat failed.");
466 result = MU_FAILURE;
467 }
468 else
469 {
470 result = 0;
471 }
472 break;
473 }
474 }
475 }
476 token_index++;
477 }
478 STRING_delete(token_value);
479 }
480 STRING_TOKENIZER_destroy(token_handle);
481 }
482
483 return result;
484}
485
486static int parse_device_twin_topic_info(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code)
487{
488 int result;
489 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
490 if (token_handle == NULL)
491 {
492 LogError("Failed creating token from device twin topic.");
493 result = MU_FAILURE;
494 *status_code = 0;
495 *request_id = 0;
496 *patch_msg = false;
497 }
498 else
499 {
500 STRING_HANDLE token_value;
501 if ((token_value = STRING_new()) == NULL)
502 {
503 LogError("Failed allocating new string .");
504 result = MU_FAILURE;
505 *status_code = 0;
506 *request_id = 0;
507 *patch_msg = false;
508 }
509 else
510 {
511 result = MU_FAILURE;
512 size_t token_count = 0;
513 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
514 {
515 if (token_count == 2)
516 {
517 if (strcmp(STRING_c_str(token_value), "PATCH") == 0)
518 {
519 *patch_msg = true;
520 *status_code = 0;
521 *request_id = 0;
522 result = 0;
523 break;
524 }
525 else
526 {
527 *patch_msg = false;
528 }
529 }
530 else if (token_count == 3)
531 {
532 *status_code = (int)atol(STRING_c_str(token_value));
533 if (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/?$rid=") == 0)
534 {
535 *request_id = (size_t)atol(STRING_c_str(token_value));
536 }
537 *patch_msg = false;
538 result = 0;
539 break;
540 }
541 token_count++;
542 }
543 STRING_delete(token_value);
544 }
545 STRING_TOKENIZER_destroy(token_handle);
546 }
547 return result;
548}
549
550static int InternStrnicmp(const char* s1, const char* s2, size_t n)
551{
552 int result;
553
554 if (s1 == NULL) result = -1;
555 else if (s2 == NULL) result = 1;
556 else
557 {
558 result = 0;
559
560 while (n-- && result == 0)
561 {
562 if (*s1 == 0) result = -1;
563 else if (*s2 == 0) result = 1;
564 else
565 {
566
567 result = TOLOWER(*s1) - TOLOWER(*s2);
568 ++s1;
569 ++s2;
570 }
571 }
572 }
573 return result;
574}
575
576static IOTHUB_IDENTITY_TYPE retrieve_topic_type(const char* topic_resp, const char* input_queue)
577{
578 IOTHUB_IDENTITY_TYPE type;
579 if (InternStrnicmp(topic_resp, TOPIC_DEVICE_TWIN_PREFIX, sizeof(TOPIC_DEVICE_TWIN_PREFIX) - 1) == 0)
580 {
581 type = IOTHUB_TYPE_DEVICE_TWIN;
582 }
583 else if (InternStrnicmp(topic_resp, TOPIC_DEVICE_METHOD_PREFIX, sizeof(TOPIC_DEVICE_METHOD_PREFIX) - 1) == 0)
584 {
585 type = IOTHUB_TYPE_DEVICE_METHODS;
586 }
587 // input_queue contains additional "#" from subscribe, which we strip off on comparing incoming.
588 else if ((input_queue != NULL) && InternStrnicmp(topic_resp, input_queue, strlen(input_queue) - 1) == 0)
589 {
590 type = IOTHUB_TYPE_EVENT_QUEUE;
591 }
592 else
593 {
594 type = IOTHUB_TYPE_TELEMETRY;
595 }
596 return type;
597
598}
599
600static void sendMsgComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult)
601{
602 DLIST_ENTRY messageCompleted;
603 DList_InitializeListHead(&messageCompleted);
604 DList_InsertTailList(&messageCompleted, &(iothubMsgList->entry));
605 transport_data->transport_callbacks.send_complete_cb(&messageCompleted, confirmResult, transport_data->transport_ctx);
606}
607
608static int addUserPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
609{
610 int result = 0;
611 const char* const* propertyKeys;
612 const char* const* propertyValues;
613 size_t propertyCount;
614 size_t index = *index_ptr;
615 MAP_HANDLE properties_map = IoTHubMessage_Properties(iothub_message_handle);
616 if (properties_map != NULL)
617 {
618 if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
619 {
620 LogError("Failed to get the internals of the property map.");
621 result = MU_FAILURE;
622 }
623 else
624 {
625 if (propertyCount != 0)
626 {
627 for (index = 0; index < propertyCount && result == 0; index++)
628 {
629 if (urlencode)
630 {
631 STRING_HANDLE property_key = URL_EncodeString(propertyKeys[index]);
632 STRING_HANDLE property_value = URL_EncodeString(propertyValues[index]);
633 if ((property_key == NULL) || (property_value == NULL))
634 {
635 LogError("Failed URL Encoding properties");
636 result = MU_FAILURE;
637 }
638 else if (STRING_sprintf(topic_string, "%s=%s%s", STRING_c_str(property_key), STRING_c_str(property_value), propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0)
639 {
640 LogError("Failed constructing property string.");
641 result = MU_FAILURE;
642 }
643 STRING_delete(property_key);
644 STRING_delete(property_value);
645 }
646 else
647 {
648 if (STRING_sprintf(topic_string, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0)
649 {
650 LogError("Failed constructing property string.");
651 result = MU_FAILURE;
652 }
653 }
654 }
655 }
656 }
657 }
658 *index_ptr = index;
659 return result;
660}
661
662static int addSystemPropertyToTopicString(STRING_HANDLE topic_string, size_t index, const char* property_key, const char* property_value, bool urlencode)
663{
664 int result = 0;
665
666 if (urlencode)
667 {
668 STRING_HANDLE encoded_property_value = URL_EncodeString(property_value);
669 if (encoded_property_value == NULL)
670 {
671 LogError("Failed URL encoding %s.", property_key);
672 result = MU_FAILURE;
673 }
674 else if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, STRING_c_str(encoded_property_value)) != 0)
675 {
676 LogError("Failed setting %s.", property_key);
677 result = MU_FAILURE;
678 }
679 STRING_delete(encoded_property_value);
680 }
681 else
682 {
683 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, property_value) != 0)
684 {
685 LogError("Failed setting %s.", property_key);
686 result = MU_FAILURE;
687 }
688 }
689 return result;
690}
691
692static int addSystemPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
693{
694 int result = 0;
695 size_t index = *index_ptr;
696
697 bool is_security_msg = IoTHubMessage_IsSecurityMessage(iothub_message_handle);
698 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_052: [ IoTHubTransport_MQTT_Common_DoWork shall check for the CorrelationId property and if found add the value as a system property in the format of $.cid=<id> ] */
699 const char* correlation_id = IoTHubMessage_GetCorrelationId(iothub_message_handle);
700 if (correlation_id != NULL)
701 {
702 result = addSystemPropertyToTopicString(topic_string, index, CORRELATION_ID_PROPERTY, correlation_id, urlencode);
703 index++;
704 }
705 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_053: [ IoTHubTransport_MQTT_Common_DoWork shall check for the MessageId property and if found add the value as a system property in the format of $.mid=<id> ] */
706 if (result == 0)
707 {
708 const char* msg_id = IoTHubMessage_GetMessageId(iothub_message_handle);
709 if (msg_id != NULL)
710 {
711 result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_ID_PROPERTY, msg_id, urlencode);
712 index++;
713 }
714 }
715 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_010: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentType property and if found add the `value` as a system property in the format of `$.ct=<value>` ]
716 if (result == 0)
717 {
718 const char* content_type = IoTHubMessage_GetContentTypeSystemProperty(iothub_message_handle);
719 if (content_type != NULL)
720 {
721 result = addSystemPropertyToTopicString(topic_string, index, CONTENT_TYPE_PROPERTY, content_type, urlencode);
722 index++;
723 }
724 }
725 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_011: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentEncoding property and if found add the `value` as a system property in the format of `$.ce=<value>` ]
726 if (result == 0)
727 {
728 const char* content_encoding = IoTHubMessage_GetContentEncodingSystemProperty(iothub_message_handle);
729 if (content_encoding != NULL)
730 {
731 // Security message require content encoding
732 result = addSystemPropertyToTopicString(topic_string, index, CONTENT_ENCODING_PROPERTY, content_encoding, is_security_msg ? true : urlencode);
733 index++;
734 }
735 }
736 if (result == 0)
737 {
738 if (is_security_msg)
739 {
740 // The Security interface Id value must be encoded
741 if (addSystemPropertyToTopicString(topic_string, index++, SECURITY_INTERFACE_ID_MQTT, SECURITY_INTERFACE_ID_VALUE, true) != 0)
742 {
743 LogError("Failed setting Security interface id");
744 result = MU_FAILURE;
745 }
746 else
747 {
748 result = 0;
749 }
750 }
751 }
752 *index_ptr = index;
753 return result;
754}
755
756static int addDiagnosticPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr)
757{
758 int result = 0;
759 size_t index = *index_ptr;
760
761 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_014: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the diagnostic properties including diagid and diagCreationTimeUtc and if found both add them as system property in the format of `$.diagid` and `$.diagctx` respectively]
762 const IOTHUB_MESSAGE_DIAGNOSTIC_PROPERTY_DATA* diagnosticData = IoTHubMessage_GetDiagnosticPropertyData(iothub_message_handle);
763 if (diagnosticData != NULL)
764 {
765 const char* diag_id = diagnosticData->diagnosticId;
766 const char* creation_time_utc = diagnosticData->diagnosticCreationTimeUtc;
767 //diagid and creationtimeutc must be present/unpresent simultaneously
768 if (diag_id != NULL && creation_time_utc != NULL)
769 {
770 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_ID_PROPERTY, diag_id) != 0)
771 {
772 LogError("Failed setting diagnostic id");
773 result = MU_FAILURE;
774 }
775 index++;
776
777 if (result == 0)
778 {
779 //construct diagnostic context, it should be urlencode(key1=value1,key2=value2)
780 STRING_HANDLE diagContextHandle = STRING_construct_sprintf("%s=%s", DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY, creation_time_utc);
781 if (diagContextHandle == NULL)
782 {
783 LogError("Failed constructing diagnostic context");
784 result = MU_FAILURE;
785 }
786 else
787 {
788 //Add other diagnostic context properties here if have more
789 STRING_HANDLE encodedContextValueHandle = URL_Encode(diagContextHandle);
790 const char* encodedContextValueString = NULL;
791 if (encodedContextValueHandle != NULL &&
792 (encodedContextValueString = STRING_c_str(encodedContextValueHandle)) != NULL)
793 {
794 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_CONTEXT_PROPERTY, encodedContextValueString) != 0)
795 {
796 LogError("Failed setting diagnostic context");
797 result = MU_FAILURE;
798 }
799 STRING_delete(encodedContextValueHandle);
800 encodedContextValueHandle = NULL;
801 }
802 else
803 {
804 LogError("Failed encoding diagnostic context value");
805 result = MU_FAILURE;
806 }
807 STRING_delete(diagContextHandle);
808 diagContextHandle = NULL;
809 index++;
810 }
811 }
812 }
813 else if (diag_id != NULL || creation_time_utc != NULL)
814 {
815 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_015: [ `IoTHubTransport_MQTT_Common_DoWork` shall check whether diagid and diagCreationTimeUtc be present simultaneously, treat as error if not]
816 LogError("diagid and diagcreationtimeutc must be present simultaneously.");
817 result = MU_FAILURE;
818 }
819 }
820 return result;
821}
822
823
824static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic, bool urlencode)
825{
826 size_t index = 0;
827 STRING_HANDLE result = STRING_construct(eventTopic);
828 if (result == NULL)
829 {
830 LogError("Failed to create event topic string handle");
831 }
832 else if (addUserPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0)
833 {
834 LogError("Failed adding Properties to uMQTT Message");
835 STRING_delete(result);
836 result = NULL;
837 }
838 else if (addSystemPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0)
839 {
840 LogError("Failed adding System Properties to uMQTT Message");
841 STRING_delete(result);
842 result = NULL;
843 }
844 else if (addDiagnosticPropertiesTouMqttMessage(iothub_message_handle, result, &index) != 0)
845 {
846 LogError("Failed adding Diagnostic Properties to uMQTT Message");
847 STRING_delete(result);
848 result = NULL;
849 }
850
851 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_060: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the OutputName property and if found add the value as a system property in the format of $.on=<value> ]
852 if (result != NULL)
853 {
854 const char* output_name = IoTHubMessage_GetOutputName(iothub_message_handle);
855 if (output_name != NULL)
856 {
857 if (STRING_sprintf(result, "%s%%24.on=%s/", index == 0 ? "" : PROPERTY_SEPARATOR, output_name) != 0)
858 {
859 LogError("Failed setting output name.");
860 STRING_delete(result);
861 result = NULL;
862 }
863 index++;
864 }
865 }
866
867 return result;
868}
869
870static int publish_mqtt_telemetry_msg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
871{
872 int result;
873 STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent), transport_data->auto_url_encode_decode);
874 if (msgTopic == NULL)
875 {
876 LogError("Failed adding properties to mqtt message");
877 result = MU_FAILURE;
878 }
879 else
880 {
881 MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create_in_place(mqttMsgEntry->packet_id, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
882 if (mqttMsg == NULL)
883 {
884 LogError("Failed creating mqtt message");
885 result = MU_FAILURE;
886 }
887 else
888 {
889 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
890 {
891 LogError("Failed retrieving tickcounter info");
892 result = MU_FAILURE;
893 }
894 else
895 {
896 if (mqtt_client_publish(transport_data->mqttClient, mqttMsg) != 0)
897 {
898 LogError("Failed attempting to publish mqtt message");
899 result = MU_FAILURE;
900 }
901 else
902 {
903 mqttMsgEntry->retryCount++;
904 result = 0;
905 }
906 }
907 mqttmessage_destroy(mqttMsg);
908 }
909 STRING_delete(msgTopic);
910 }
911 return result;
912}
913
914static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size)
915{
916 int result;
917 uint16_t packet_id = get_next_packet_id(transport_data);
918
919 STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id));
920 if (msg_topic == NULL)
921 {
922 LogError("Failed constructing message topic.");
923 result = MU_FAILURE;
924 }
925 else
926 {
927 MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create_in_place(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, response, response_size);
928 if (mqtt_get_msg == NULL)
929 {
930 LogError("Failed constructing mqtt message.");
931 result = MU_FAILURE;
932 }
933 else
934 {
935 if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
936 {
937 LogError("Failed publishing to mqtt client.");
938 result = MU_FAILURE;
939 }
940 else
941 {
942 result = 0;
943 }
944 mqttmessage_destroy(mqtt_get_msg);
945 }
946 STRING_delete(msg_topic);
947 }
948 return result;
949}
950
951
952static void destroy_device_twin_get_message(MQTT_DEVICE_TWIN_ITEM* msg_entry)
953{
954 free(msg_entry);
955}
956
957static MQTT_DEVICE_TWIN_ITEM* create_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id)
958{
959 MQTT_DEVICE_TWIN_ITEM* result;
960 tickcounter_ms_t current_time;
961
962 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
963 {
964 LogError("Failed retrieving tickcounter info");
965 result = NULL;
966 }
967 else if ((result = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM))) == NULL)
968 {
969 LogError("Failed allocating device twin data.");
970 result = NULL;
971 }
972 else
973 {
974 memset(result, 0, sizeof(*result));
975 result->msgCreationTime = current_time;
976 result->packet_id = get_next_packet_id(transport_data);
977 result->iothub_msg_id = iothub_msg_id;
978 result->device_twin_msg_type = device_twin_msg_type;
979 }
980
981 return result;
982}
983
984static int publish_device_twin_get_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
985{
986 int result;
987
988 STRING_HANDLE msg_topic = STRING_construct_sprintf(GET_PROPERTIES_TOPIC, mqtt_info->packet_id);
989 if (msg_topic == NULL)
990 {
991 LogError("Failed constructing get Prop topic.");
992 result = MU_FAILURE;
993 }
994 else
995 {
996 MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(mqtt_info->packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, NULL, 0);
997 if (mqtt_get_msg == NULL)
998 {
999 LogError("Failed constructing mqtt message.");
1000 result = MU_FAILURE;
1001 }
1002 else
1003 {
1004 if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
1005 {
1006 LogError("Failed publishing to mqtt client.");
1007 result = MU_FAILURE;
1008 }
1009 else
1010 {
1011 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
1012 result = 0;
1013 }
1014 mqttmessage_destroy(mqtt_get_msg);
1015 }
1016 STRING_delete(msg_topic);
1017 }
1018
1019 return result;
1020}
1021
1022static void sendPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transportData)
1023{
1024 PDLIST_ENTRY dev_twin_item = transportData->pending_get_twin_queue.Flink;
1025
1026 while (dev_twin_item != &transportData->pending_get_twin_queue)
1027 {
1028 DLIST_ENTRY saveListEntry;
1029 saveListEntry.Flink = dev_twin_item->Flink;
1030 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
1031 (void)DList_RemoveEntryList(dev_twin_item);
1032
1033 if (publish_device_twin_get_message(transportData, msg_entry) != 0)
1034 {
1035 LogError("Failed sending pending get twin request");
1036 destroy_device_twin_get_message(msg_entry);
1037 }
1038 else
1039 {
1040 transportData->device_twin_get_sent = true;
1041 }
1042
1043 dev_twin_item = saveListEntry.Flink;
1044 }
1045}
1046
1047
1048static void removeExpiredTwinRequestsFromList(PMQTTTRANSPORT_HANDLE_DATA transport_data, tickcounter_ms_t current_ms, DLIST_ENTRY* twin_list)
1049{
1050 PDLIST_ENTRY list_item = twin_list->Flink;
1051
1052 while (list_item != twin_list)
1053 {
1054 DLIST_ENTRY next_list_item;
1055 next_list_item.Flink = list_item->Flink;
1056 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(list_item, MQTT_DEVICE_TWIN_ITEM, entry);
1057 bool item_timed_out = false;
1058
1059 if ((msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES) &&
1060 (((current_ms - msg_entry->msgCreationTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS))
1061 {
1062 item_timed_out = true;
1063 if (msg_entry->userCallback != NULL)
1064 {
1065 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, msg_entry->userContext);
1066 }
1067 }
1068 else if ((msg_entry->device_twin_msg_type == REPORTED_STATE) &&
1069 (((current_ms - msg_entry->msgCreationTime) / 1000) >= TWIN_REPORT_UPDATE_TIMEOUT_SECS))
1070 {
1071 item_timed_out = true;
1072 transport_data->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, STATUS_CODE_TIMEOUT_VALUE, transport_data->transport_ctx);
1073 }
1074
1075 if (item_timed_out)
1076 {
1077 (void)DList_RemoveEntryList(list_item);
1078 destroy_device_twin_get_message(msg_entry);
1079 }
1080
1081 list_item = next_list_item.Flink;
1082 }
1083
1084}
1085
1086static void removeExpiredTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1087{
1088 tickcounter_ms_t current_ms;
1089
1090 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms) == 0)
1091 {
1092 removeExpiredTwinRequestsFromList(transport_data, current_ms, &transport_data->pending_get_twin_queue);
1093 removeExpiredTwinRequestsFromList(transport_data, current_ms, &transport_data->ack_waiting_queue);
1094 }
1095}
1096
1097static int publish_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
1098{
1099 int result;
1100
1101 STRING_HANDLE msgTopic = STRING_construct_sprintf(REPORTED_PROPERTIES_TOPIC, mqtt_info->packet_id);
1102 if (msgTopic == NULL)
1103 {
1104 LogError("Failed constructing reported prop topic.");
1105 result = MU_FAILURE;
1106 }
1107 else
1108 {
1109 const CONSTBUFFER* data_buff;
1110 if ((data_buff = CONSTBUFFER_GetContent(device_twin_info->report_data_handle)) == NULL)
1111 {
1112 LogError("Failed retrieving buffer content");
1113 result = MU_FAILURE;
1114 }
1115 else
1116 {
1117 MQTT_MESSAGE_HANDLE mqtt_rpt_msg = mqttmessage_create_in_place(mqtt_info->packet_id, STRING_c_str(msgTopic), DELIVER_AT_MOST_ONCE, data_buff->buffer, data_buff->size);
1118 if (mqtt_rpt_msg == NULL)
1119 {
1120 LogError("Failed creating mqtt message");
1121 result = MU_FAILURE;
1122 }
1123 else
1124 {
1125 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0)
1126 {
1127 LogError("Failed retrieving tickcounter info");
1128 result = MU_FAILURE;
1129 }
1130 else
1131 {
1132 if (mqtt_client_publish(transport_data->mqttClient, mqtt_rpt_msg) != 0)
1133 {
1134 LogError("Failed publishing mqtt message");
1135 result = MU_FAILURE;
1136 }
1137 else
1138 {
1139 mqtt_info->retryCount++;
1140 result = 0;
1141 }
1142 }
1143 mqttmessage_destroy(mqtt_rpt_msg);
1144 }
1145 }
1146 STRING_delete(msgTopic);
1147 }
1148 return result;
1149}
1150
1151static void changeStateToSubscribeIfAllowed(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1152{
1153 if (transport_data->currPacketState != CONNACK_TYPE &&
1154 transport_data->currPacketState != CONNECT_TYPE &&
1155 transport_data->currPacketState != DISCONNECT_TYPE &&
1156 transport_data->currPacketState != PACKET_TYPE_ERROR)
1157 {
1158 transport_data->currPacketState = SUBSCRIBE_TYPE;
1159 }
1160}
1161
1162static int subscribeToNotifyStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1163{
1164 int result;
1165
1166 if (transport_data->topic_NotifyState == NULL)
1167 {
1168 transport_data->topic_NotifyState = STRING_construct(TOPIC_NOTIFICATION_STATE);
1169 if (transport_data->topic_NotifyState == NULL)
1170 {
1171 LogError("Failure: unable constructing notify state topic");
1172 result = MU_FAILURE;
1173 }
1174 else
1175 {
1176 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1177 result = 0;
1178 }
1179 }
1180 else
1181 {
1182 result = 0;
1183 }
1184
1185 if (result == 0)
1186 {
1187 changeStateToSubscribeIfAllowed(transport_data);
1188 }
1189
1190 return result;
1191}
1192
1193
1194static bool isSystemProperty(const char* tokenData)
1195{
1196 bool result = false;
1197 size_t propCount = sizeof(sysPropList) / sizeof(sysPropList[0]);
1198 size_t index = 0;
1199 for (index = 0; index < propCount; index++)
1200 {
1201 if (memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0)
1202 {
1203 result = true;
1204 break;
1205 }
1206 }
1207 return result;
1208}
1209
1210// Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ]
1211// Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ]
1212static int addInputNamePropertyToMessage(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name)
1213{
1214 int result = MU_FAILURE;
1215 int number_tokens_read = 0;
1216
1217 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(topic_name);
1218 if (token_handle == NULL)
1219 {
1220 LogError("STRING_TOKENIZER_create_from_char failed\n");
1221 result = MU_FAILURE;
1222 }
1223 else
1224 {
1225 STRING_HANDLE token_value;
1226 if ((token_value = STRING_new()) == NULL)
1227 {
1228 LogError("Failed allocating token_value");
1229 }
1230 else
1231 {
1232 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
1233 {
1234 number_tokens_read++;
1235 if (number_tokens_read == (slashes_to_reach_input_name + 1))
1236 {
1237 if ((IOTHUB_MESSAGE_OK != IoTHubMessage_SetInputName(IoTHubMessage, STRING_c_str(token_value))))
1238 {
1239 LogError("Failed adding input name to msg");
1240 result = MU_FAILURE;
1241 }
1242 else
1243 {
1244 result = 0;
1245 }
1246 break;
1247 }
1248 }
1249 }
1250 STRING_delete(token_value);
1251
1252 if (number_tokens_read != (slashes_to_reach_input_name + 1))
1253 {
1254 LogError("Not enough '/' to contain input name. Got %d, need at least %d", number_tokens_read, (slashes_to_reach_input_name + 1));
1255 result = MU_FAILURE;
1256 }
1257 STRING_TOKENIZER_destroy(token_handle);
1258 }
1259
1260 return result;
1261}
1262
1263static int setMqttMessagePropertyIfPossible(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* propName, const char* propValue, size_t nameLen)
1264{
1265 // Not finding a system property to map to isn't an error.
1266 int result = 0;
1267
1268 if (nameLen > 4)
1269 {
1270 if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_DEVICE_ID) == 0)
1271 {
1272 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_063: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.cdid` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ConnectionDeviceId property ]
1273 if (IoTHubMessage_SetConnectionDeviceId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1274 {
1275 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
1276 result = MU_FAILURE;
1277 }
1278 return result;
1279 }
1280 if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_MODULE_ID_PROPERTY) == 0)
1281 {
1282 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_064: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.cmid` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ConnectionModuleId property ]
1283 if (IoTHubMessage_SetConnectionModuleId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1284 {
1285 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
1286 result = MU_FAILURE;
1287 }
1288 return result;
1289 }
1290 }
1291 if (nameLen > 3)
1292 {
1293 if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_ID_PROPERTY) == 0)
1294 {
1295 if (IoTHubMessage_SetMessageId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1296 {
1297 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
1298 result = MU_FAILURE;
1299 }
1300 return result;
1301 }
1302 else if (strcmp((const char*)&propName[nameLen - 3], CORRELATION_ID_PROPERTY) == 0)
1303 {
1304 if (IoTHubMessage_SetCorrelationId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1305 {
1306 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
1307 result = MU_FAILURE;
1308 }
1309 return result;
1310 }
1311 }
1312
1313 if (nameLen > 2)
1314 {
1315 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_012: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.ct` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ContentType property ]
1316 if (strcmp((const char*)&propName[nameLen - 2], CONTENT_TYPE_PROPERTY) == 0)
1317 {
1318 if (IoTHubMessage_SetContentTypeSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1319 {
1320 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'customContentType' property.");
1321 result = MU_FAILURE;
1322 }
1323 return result;
1324 }
1325 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_013: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.ce` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ContentEncoding property ]
1326 else if (strcmp((const char*)&propName[nameLen - 2], CONTENT_ENCODING_PROPERTY) == 0)
1327 {
1328 if (IoTHubMessage_SetContentEncodingSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1329 {
1330 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'contentEncoding' property.");
1331 result = MU_FAILURE;
1332 }
1333 return result;
1334 }
1335 }
1336
1337 return result;
1338}
1339
1340static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name, bool urldecode)
1341{
1342 int result;
1343 STRING_TOKENIZER_HANDLE token = STRING_TOKENIZER_create_from_char(topic_name);
1344 if (token != NULL)
1345 {
1346 MAP_HANDLE propertyMap = IoTHubMessage_Properties(IoTHubMessage);
1347 if (propertyMap == NULL)
1348 {
1349 LogError("Failure to retrieve IoTHubMessage_properties.");
1350 result = MU_FAILURE;
1351 }
1352 else
1353 {
1354 STRING_HANDLE output = STRING_new();
1355 if (output == NULL)
1356 {
1357 LogError("Failure to allocate STRING_new.");
1358 result = MU_FAILURE;
1359 }
1360 else
1361 {
1362 result = 0;
1363
1364 while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0)
1365 {
1366 const char* tokenData = STRING_c_str(output);
1367 size_t tokenLen = strlen(tokenData);
1368 if (tokenData == NULL || tokenLen == 0)
1369 {
1370 break;
1371 }
1372 else
1373 {
1374 if (isSystemProperty(tokenData))
1375 {
1376 const char* iterator = tokenData;
1377 while (iterator != NULL && *iterator != '\0' && result == 0)
1378 {
1379 if (*iterator == '=')
1380 {
1381 char* propName = NULL;
1382 char* propValue = NULL;
1383 size_t nameLen = iterator - tokenData;
1384 size_t valLen = tokenLen - (nameLen + 1) + 1;
1385
1386 if ((propName = malloc(nameLen + 1)) == NULL || (propValue = malloc(valLen + 1)) == NULL)
1387 {
1388 LogError("Failed allocating property name (%p) and/or value (%p)", propName, propValue);
1389 free(propName);
1390 result = MU_FAILURE;
1391 }
1392 else
1393 {
1394 memcpy(propName, tokenData, nameLen);
1395 propName[nameLen] = '\0';
1396
1397 memcpy(propValue, iterator + 1, valLen);
1398 propValue[valLen] = '\0';
1399
1400 if (urldecode)
1401 {
1402 STRING_HANDLE propValue_decoded;
1403 if ((propValue_decoded = URL_DecodeString(propValue)) == NULL)
1404 {
1405 LogError("Failed to URL decode property value");
1406 result = MU_FAILURE;
1407 }
1408 else if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, STRING_c_str(propValue_decoded), nameLen) != 0)
1409 {
1410 LogError("Unable to set message property");
1411 result = MU_FAILURE;
1412 }
1413 STRING_delete(propValue_decoded);
1414 }
1415 else
1416 {
1417 if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, propValue, nameLen) != 0)
1418 {
1419 LogError("Unable to set message property");
1420 result = MU_FAILURE;
1421 }
1422 }
1423 free(propName);
1424 free(propValue);
1425 }
1426 break;
1427 }
1428 iterator++;
1429 }
1430 }
1431 else //User Properties
1432 {
1433 const char* iterator = tokenData;
1434 while (iterator != NULL && *iterator != '\0' && result == 0)
1435 {
1436 if (*iterator == '=')
1437 {
1438 char* propName = NULL;
1439 char* propValue = NULL;
1440 size_t nameLen = iterator - tokenData;
1441 size_t valLen = tokenLen - (nameLen + 1) + 1;
1442
1443 if ((propName = (char*)malloc(nameLen + 1)) == NULL || (propValue = (char*)malloc(valLen + 1)) == NULL)
1444 {
1445 free(propName);
1446 LogError("Failed allocating property information");
1447 result = MU_FAILURE;
1448 }
1449 else
1450 {
1451 memcpy(propName, tokenData, nameLen);
1452 propName[nameLen] = '\0';
1453
1454 memcpy(propValue, iterator + 1, valLen);
1455 propValue[valLen] = '\0';
1456
1457 if (urldecode)
1458 {
1459 STRING_HANDLE propName_decoded = URL_DecodeString(propName);
1460 STRING_HANDLE propValue_decoded = URL_DecodeString(propValue);
1461 if (propName_decoded == NULL || propValue_decoded == NULL)
1462 {
1463 LogError("Failed to URL decode property");
1464 result = MU_FAILURE;
1465 }
1466 else if (Map_AddOrUpdate(propertyMap, STRING_c_str(propName_decoded), STRING_c_str(propValue_decoded)) != MAP_OK)
1467 {
1468 LogError("Map_AddOrUpdate failed.");
1469 result = MU_FAILURE;
1470 }
1471 STRING_delete(propName_decoded);
1472 STRING_delete(propValue_decoded);
1473 }
1474 else
1475 {
1476 if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK)
1477 {
1478 LogError("Map_AddOrUpdate failed.");
1479 result = MU_FAILURE;
1480 }
1481 }
1482 free(propName);
1483 free(propValue);
1484 }
1485 break;
1486 }
1487 iterator++;
1488 }
1489 }
1490 }
1491 }
1492 STRING_delete(output);
1493 }
1494 }
1495 STRING_TOKENIZER_destroy(token);
1496 }
1497 else
1498 {
1499 LogError("Unable to create Tokenizer object.");
1500 result = MU_FAILURE;
1501 }
1502 return result;
1503}
1504
1505static void mqtt_notification_callback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx)
1506{
1507 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqtt_notification_callback shall do nothing. ] */
1508 if (msgHandle != NULL && callbackCtx != NULL)
1509 {
1510 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqtt_notification_callback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */
1511 const char* topic_resp = mqttmessage_getTopicName(msgHandle);
1512 if (topic_resp == NULL)
1513 {
1514 LogError("Failure: NULL topic name encountered");
1515 }
1516 else
1517 {
1518 PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1519
1520 IOTHUB_IDENTITY_TYPE type = retrieve_topic_type(topic_resp, STRING_c_str(transportData->topic_InputQueue));
1521 if (type == IOTHUB_TYPE_DEVICE_TWIN)
1522 {
1523 size_t request_id;
1524 int status_code;
1525 bool notification_msg;
1526 if (parse_device_twin_topic_info(topic_resp, &notification_msg, &request_id, &status_code) != 0)
1527 {
1528 LogError("Failure: parsing device topic info");
1529 }
1530 else
1531 {
1532 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1533 if (notification_msg)
1534 {
1535 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx);
1536 }
1537 else
1538 {
1539 PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink;
1540 while (dev_twin_item != &transportData->ack_waiting_queue)
1541 {
1542 DLIST_ENTRY saveListEntry;
1543 saveListEntry.Flink = dev_twin_item->Flink;
1544 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
1545 if (request_id == msg_entry->packet_id)
1546 {
1547 (void)DList_RemoveEntryList(dev_twin_item);
1548 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES)
1549 {
1550 if (msg_entry->userCallback == NULL)
1551 {
1552 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */
1553 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx);
1554 // Only after receiving device twin request should we start listening for patches.
1555 (void)subscribeToNotifyStateIfNeeded(transportData);
1556 }
1557 else
1558 {
1559 // This is a on-demand get twin request.
1560 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext);
1561 }
1562 }
1563 else
1564 {
1565 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_ReportedStateComplete ] */
1566 transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx);
1567 // Only after receiving device twin request should we start listening for patches.
1568 (void)subscribeToNotifyStateIfNeeded(transportData);
1569 }
1570
1571 destroy_device_twin_get_message(msg_entry);
1572 break;
1573 }
1574 dev_twin_item = saveListEntry.Flink;
1575 }
1576 }
1577 }
1578 }
1579 else if (type == IOTHUB_TYPE_DEVICE_METHODS)
1580 {
1581 STRING_HANDLE method_name = STRING_new();
1582 if (method_name == NULL)
1583 {
1584 LogError("Failure: allocating method_name string value");
1585 }
1586 else
1587 {
1588 DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO));
1589 if (dev_method_info == NULL)
1590 {
1591 LogError("Failure: allocating DEVICE_METHOD_INFO object");
1592 }
1593 else
1594 {
1595 dev_method_info->request_id = STRING_new();
1596 if (dev_method_info->request_id == NULL)
1597 {
1598 LogError("Failure constructing request_id string");
1599 free(dev_method_info);
1600 }
1601 else if (retrieve_device_method_rid_info(topic_resp, method_name, dev_method_info->request_id) != 0)
1602 {
1603 LogError("Failure: retrieve device topic info");
1604 STRING_delete(dev_method_info->request_id);
1605 free(dev_method_info);
1606 }
1607 else
1608 {
1609 /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */
1610 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1611 if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0)
1612 {
1613 LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete");
1614 STRING_delete(dev_method_info->request_id);
1615 free(dev_method_info);
1616 }
1617 }
1618 }
1619 STRING_delete(method_name);
1620 }
1621 }
1622 else
1623 {
1624 const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle);
1625 IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length);
1626 if (IoTHubMessage == NULL)
1627 {
1628 LogError("Failure: IotHub Message creation has failed.");
1629 }
1630 else
1631 {
1632 if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMessage(IoTHubMessage, topic_resp) != 0))
1633 {
1634 LogError("failure adding input name to property.");
1635 }
1636 // Will need to update this when the service has messages that can be rejected
1637 else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0)
1638 {
1639 LogError("failure extracting mqtt properties.");
1640 }
1641 else
1642 {
1643 MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO));
1644 if (messageData == NULL)
1645 {
1646 LogError("malloc failed");
1647 }
1648 else
1649 {
1650 messageData->messageHandle = IoTHubMessage;
1651 messageData->transportContext = NULL;
1652
1653 if (type == IOTHUB_TYPE_EVENT_QUEUE)
1654 {
1655 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqtt_notification_callback` shall call `IoTHubClient_LL_MessageCallback`. ]
1656 if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx))
1657 {
1658 LogError("IoTHubClientCore_LL_MessageCallbackreturned false");
1659
1660 IoTHubMessage_Destroy(IoTHubMessage);
1661 free(messageData);
1662 }
1663 }
1664 else
1665 {
1666 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_MessageCallback. ] */
1667 if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx))
1668 {
1669 LogError("IoTHubClientCore_LL_MessageCallback returned false");
1670 IoTHubMessage_Destroy(IoTHubMessage);
1671 free(messageData);
1672 }
1673 }
1674 }
1675 }
1676 }
1677 }
1678 }
1679 }
1680}
1681
1682static void mqtt_operation_complete_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx)
1683{
1684 (void)handle;
1685 if (callbackCtx != NULL)
1686 {
1687 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1688
1689 switch (actionResult)
1690 {
1691 case MQTT_CLIENT_ON_PUBLISH_ACK:
1692 case MQTT_CLIENT_ON_PUBLISH_COMP:
1693 {
1694 const PUBLISH_ACK* puback = (const PUBLISH_ACK*)msgInfo;
1695 if (puback != NULL)
1696 {
1697 PDLIST_ENTRY currentListEntry = transport_data->telemetry_waitingForAck.Flink;
1698 while (currentListEntry != &transport_data->telemetry_waitingForAck)
1699 {
1700 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentListEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
1701 DLIST_ENTRY saveListEntry;
1702 saveListEntry.Flink = currentListEntry->Flink;
1703
1704 if (puback->packetId == mqttMsgEntry->packet_id)
1705 {
1706 (void)DList_RemoveEntryList(currentListEntry); //First remove the item from Waiting for Ack List.
1707 sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK);
1708 free(mqttMsgEntry);
1709 }
1710 currentListEntry = saveListEntry.Flink;
1711 }
1712 }
1713 else
1714 {
1715 LogError("Failure: MQTT_CLIENT_ON_PUBLISH_ACK publish_ack structure NULL.");
1716 }
1717 break;
1718 }
1719 case MQTT_CLIENT_ON_CONNACK:
1720 {
1721 const CONNECT_ACK* connack = (const CONNECT_ACK*)msgInfo;
1722 if (connack != NULL)
1723 {
1724 if (connack->returnCode == CONNECTION_ACCEPTED)
1725 {
1726 // The connect packet has been acked
1727 transport_data->currPacketState = CONNACK_TYPE;
1728 transport_data->isRecoverableError = true;
1729 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTED;
1730
1731 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_008: [ Upon successful connection the retry control shall be reset using retry_control_reset() ]
1732 retry_control_reset(transport_data->retry_control_handle);
1733
1734 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK, transport_data->transport_ctx);
1735 }
1736 else
1737 {
1738 if (connack->returnCode == CONN_REFUSED_SERVER_UNAVAIL)
1739 {
1740 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_DEVICE_DISABLED, transport_data->transport_ctx);
1741 }
1742 else if (connack->returnCode == CONN_REFUSED_BAD_USERNAME_PASSWORD || connack->returnCode == CONN_REFUSED_ID_REJECTED)
1743 {
1744 transport_data->isRecoverableError = false;
1745 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
1746 }
1747 else if (connack->returnCode == CONN_REFUSED_NOT_AUTHORIZED)
1748 {
1749 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
1750 }
1751 else if (connack->returnCode == CONN_REFUSED_UNACCEPTABLE_VERSION)
1752 {
1753 transport_data->isRecoverableError = false;
1754 }
1755 LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode));
1756 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_PENDING_CLOSE;
1757 transport_data->currPacketState = PACKET_TYPE_ERROR;
1758 }
1759 }
1760 else
1761 {
1762 LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL.");
1763 }
1764 break;
1765 }
1766 case MQTT_CLIENT_ON_SUBSCRIBE_ACK:
1767 {
1768 const SUBSCRIBE_ACK* suback = (const SUBSCRIBE_ACK*)msgInfo;
1769 if (suback != NULL)
1770 {
1771 size_t index = 0;
1772 for (index = 0; index < suback->qosCount; index++)
1773 {
1774 if (suback->qosReturn[index] == DELIVER_FAILURE)
1775 {
1776 LogError("Subscribe delivery failure of subscribe %lu", (unsigned long)index);
1777 }
1778 }
1779 // The subscribed packet has been acked
1780 transport_data->currPacketState = SUBACK_TYPE;
1781
1782 // Is this a twin message
1783 if (suback->packetId == transport_data->twin_resp_packet_id)
1784 {
1785 transport_data->twin_resp_sub_recv = true;
1786 }
1787 }
1788 else
1789 {
1790 LogError("Failure: MQTT_CLIENT_ON_SUBSCRIBE_ACK SUBSCRIBE_ACK parameter is NULL.");
1791 }
1792 break;
1793 }
1794 case MQTT_CLIENT_ON_PUBLISH_RECV:
1795 case MQTT_CLIENT_ON_PUBLISH_REL:
1796 {
1797 // Currently not used
1798 break;
1799 }
1800 case MQTT_CLIENT_ON_DISCONNECT:
1801 {
1802 // Close the client so we can reconnect again
1803 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
1804 break;
1805 }
1806 case MQTT_CLIENT_ON_UNSUBSCRIBE_ACK:
1807 case MQTT_CLIENT_ON_PING_RESPONSE:
1808 default:
1809 {
1810 break;
1811 }
1812 }
1813 }
1814}
1815
1816// Prior to creating a new connection, if we have an existing xioTransport that has been connected before
1817// we need to clear it now or else cached settings (especially TLS when communicating with HTTP proxies)
1818// will break reconnection attempt.
1819static void ResetConnectionIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1820{
1821 if (transport_data->xioTransport != NULL && transport_data->conn_attempted)
1822 {
1823 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
1824 set_saved_tls_options(transport_data, options);
1825 DestroyXioTransport(transport_data);
1826 }
1827}
1828
1829static void mqtt_disconnect_cb(void* ctx)
1830{
1831 if (ctx != NULL)
1832 {
1833 int* disconn_recv = (int*)ctx;
1834 *disconn_recv = 1;
1835 }
1836}
1837
1838static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1839{
1840 if (transport_data->currPacketState != DISCONNECT_TYPE)
1841 {
1842 if (!transport_data->isDestroyCalled)
1843 {
1844 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
1845 set_saved_tls_options(transport_data, options);
1846 }
1847 // Ensure the disconnect message is sent
1848 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
1849 {
1850 transport_data->disconnect_recv_flag = 0;
1851 (void)mqtt_client_disconnect(transport_data->mqttClient, mqtt_disconnect_cb, &transport_data->disconnect_recv_flag);
1852 size_t disconnect_ctr = 0;
1853 do
1854 {
1855 mqtt_client_dowork(transport_data->mqttClient);
1856 disconnect_ctr++;
1857 ThreadAPI_Sleep(50);
1858 } while ((disconnect_ctr < MAX_DISCONNECT_VALUE) && (transport_data->disconnect_recv_flag == 0));
1859 }
1860 DestroyXioTransport(transport_data);
1861
1862 transport_data->device_twin_get_sent = false;
1863 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
1864 transport_data->currPacketState = DISCONNECT_TYPE;
1865 }
1866}
1867
1868static void mqtt_error_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx)
1869{
1870 (void)handle;
1871 if (callbackCtx != NULL)
1872 {
1873 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1874 switch (error)
1875 {
1876 case MQTT_CLIENT_CONNECTION_ERROR:
1877 {
1878 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK, transport_data->transport_ctx);
1879 break;
1880 }
1881 case MQTT_CLIENT_COMMUNICATION_ERROR:
1882 {
1883 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR, transport_data->transport_ctx);
1884 break;
1885 }
1886 case MQTT_CLIENT_NO_PING_RESPONSE:
1887 {
1888 LogError("Mqtt Ping Response was not encountered. Reconnecting device...");
1889 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_PING_RESPONSE, transport_data->transport_ctx);
1890 break;
1891 }
1892 case MQTT_CLIENT_PARSE_ERROR:
1893 case MQTT_CLIENT_MEMORY_ERROR:
1894 case MQTT_CLIENT_UNKNOWN_ERROR:
1895 default:
1896 {
1897 LogError("INTERNAL ERROR: unexpected error value received %s", MU_ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error));
1898 break;
1899 }
1900 }
1901 if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_PENDING_CLOSE)
1902 {
1903 // We have encountered an mqtt protocol error in an non-closing state
1904 // The best course of action is to execute a shutdown of the mqtt/tls/socket
1905 // layer and then attempt to reconnect
1906 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT;
1907 }
1908 transport_data->currPacketState = PACKET_TYPE_ERROR;
1909 transport_data->device_twin_get_sent = false;
1910 if (transport_data->topic_MqttMessage != NULL)
1911 {
1912 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
1913 }
1914 if (transport_data->topic_GetState != NULL)
1915 {
1916 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
1917 }
1918 if (transport_data->topic_NotifyState != NULL)
1919 {
1920 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1921 }
1922 if (transport_data->topic_DeviceMethods != NULL)
1923 {
1924 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
1925 }
1926 if (transport_data->topic_InputQueue != NULL)
1927 {
1928 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
1929 }
1930 }
1931 else
1932 {
1933 LogError("Failure: mqtt called back with null context.");
1934 }
1935}
1936
1937static void SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1938{
1939 if (transport_data->topics_ToSubscribe != UNSUBSCRIBE_FROM_TOPIC)
1940 {
1941 uint32_t topic_subscription = 0;
1942 size_t subscribe_count = 0;
1943 uint16_t packet_id = get_next_packet_id(transport_data);
1944 SUBSCRIBE_PAYLOAD subscribe[SUBSCRIBE_TOPIC_COUNT];
1945 if ((transport_data->topic_MqttMessage != NULL) && (SUBSCRIBE_TELEMETRY_TOPIC & transport_data->topics_ToSubscribe))
1946 {
1947 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_MqttMessage);
1948 subscribe[subscribe_count].qosReturn = DELIVER_AT_LEAST_ONCE;
1949 topic_subscription |= SUBSCRIBE_TELEMETRY_TOPIC;
1950 subscribe_count++;
1951 }
1952 if ((transport_data->topic_GetState != NULL) && (SUBSCRIBE_GET_REPORTED_STATE_TOPIC & transport_data->topics_ToSubscribe))
1953 {
1954 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_GetState);
1955 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1956 topic_subscription |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
1957 subscribe_count++;
1958 transport_data->twin_resp_packet_id = packet_id;
1959 }
1960 if ((transport_data->topic_NotifyState != NULL) && (SUBSCRIBE_NOTIFICATION_STATE_TOPIC & transport_data->topics_ToSubscribe))
1961 {
1962 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_NotifyState);
1963 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1964 topic_subscription |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1965 subscribe_count++;
1966 }
1967 if ((transport_data->topic_DeviceMethods != NULL) && (SUBSCRIBE_DEVICE_METHOD_TOPIC & transport_data->topics_ToSubscribe))
1968 {
1969 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_DeviceMethods);
1970 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1971 topic_subscription |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
1972 subscribe_count++;
1973 }
1974 if ((transport_data->topic_InputQueue != NULL) && (SUBSCRIBE_INPUT_QUEUE_TOPIC & transport_data->topics_ToSubscribe))
1975 {
1976 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_InputQueue);
1977 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1978 topic_subscription |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
1979 subscribe_count++;
1980 }
1981
1982 if (subscribe_count != 0)
1983 {
1984 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall call mqtt_client_subscribe to subscribe to the Message Topic.] */
1985 if (mqtt_client_subscribe(transport_data->mqttClient, packet_id, subscribe, subscribe_count) != 0)
1986 {
1987 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
1988 LogError("Failure: mqtt_client_subscribe returned error.");
1989 }
1990 else
1991 {
1992 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_018: [On success IoTHubTransport_MQTT_Common_Subscribe shall return 0.] */
1993 transport_data->topics_ToSubscribe &= ~topic_subscription;
1994 transport_data->currPacketState = SUBSCRIBE_TYPE;
1995 }
1996 }
1997 else
1998 {
1999 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
2000 LogError("Failure: subscribe_topic is empty.");
2001 }
2002 transport_data->currPacketState = SUBSCRIBE_TYPE;
2003 }
2004 else
2005 {
2006 transport_data->currPacketState = PUBLISH_TYPE;
2007 }
2008}
2009
2010static bool RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, const unsigned char** payload, size_t* length)
2011{
2012 bool result;
2013 IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(messageHandle);
2014 if (contentType == IOTHUBMESSAGE_BYTEARRAY)
2015 {
2016 if (IoTHubMessage_GetByteArray(messageHandle, &(*payload), length) != IOTHUB_MESSAGE_OK)
2017 {
2018 LogError("Failure result from IoTHubMessage_GetByteArray");
2019 result = false;
2020 *length = 0;
2021 }
2022 else
2023 {
2024 result = true;
2025 }
2026 }
2027 else if (contentType == IOTHUBMESSAGE_STRING)
2028 {
2029 *payload = (const unsigned char*)IoTHubMessage_GetString(messageHandle);
2030 if (*payload == NULL)
2031 {
2032 LogError("Failure result from IoTHubMessage_GetString");
2033 result = false;
2034 *length = 0;
2035 }
2036 else
2037 {
2038 *length = strlen((const char*)*payload);
2039 result = true;
2040 }
2041 }
2042 else
2043 {
2044 result = false;
2045 *length = 0;
2046 }
2047 return result;
2048}
2049
2050static void process_queued_ack_messages(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2051{
2052 PDLIST_ENTRY current_entry = transport_data->telemetry_waitingForAck.Flink;
2053 tickcounter_ms_t current_ms;
2054 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms);
2055 while (current_entry != &transport_data->telemetry_waitingForAck)
2056 {
2057 MQTT_MESSAGE_DETAILS_LIST* msg_detail_entry = containingRecord(current_entry, MQTT_MESSAGE_DETAILS_LIST, entry);
2058 DLIST_ENTRY nextListEntry;
2059 nextListEntry.Flink = current_entry->Flink;
2060
2061 if (((current_ms - msg_detail_entry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN)
2062 {
2063 if (msg_detail_entry->retryCount >= MAX_SEND_RECOUNT_LIMIT)
2064 {
2065 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);
2066 (void)DList_RemoveEntryList(current_entry);
2067 free(msg_detail_entry);
2068
2069 DisconnectFromClient(transport_data);
2070 }
2071 else
2072 {
2073 // Ensure that the packet state is PUBLISH_TYPE and then attempt to send the message
2074 // again
2075 if (transport_data->currPacketState == PUBLISH_TYPE)
2076 {
2077 size_t messageLength;
2078 const unsigned char* messagePayload = NULL;
2079 if (!RetrieveMessagePayload(msg_detail_entry->iotHubMessageEntry->messageHandle, &messagePayload, &messageLength))
2080 {
2081 (void)DList_RemoveEntryList(current_entry);
2082 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2083 }
2084 else
2085 {
2086 if (publish_mqtt_telemetry_msg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)
2087 {
2088 (void)DList_RemoveEntryList(current_entry);
2089 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2090 free(msg_detail_entry);
2091 }
2092 }
2093 }
2094 else
2095 {
2096 msg_detail_entry->retryCount++;
2097 msg_detail_entry->msgPublishTime = current_ms;
2098 }
2099 }
2100 }
2101 current_entry = nextListEntry.Flink;
2102 }
2103}
2104
2105static int GetTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2106{
2107 int result;
2108
2109 if (transport_data->xioTransport == NULL)
2110 {
2111 // construct address
2112 const char* hostAddress = STRING_c_str(transport_data->hostAddress);
2113 MQTT_TRANSPORT_PROXY_OPTIONS mqtt_proxy_options;
2114
2115 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_011: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create`. ]*/
2116 mqtt_proxy_options.host_address = transport_data->http_proxy_hostname;
2117 mqtt_proxy_options.port = transport_data->http_proxy_port;
2118 mqtt_proxy_options.username = transport_data->http_proxy_username;
2119 mqtt_proxy_options.password = transport_data->http_proxy_password;
2120
2121 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_010: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create` to obtain the underlying IO handle. ]*/
2122 transport_data->xioTransport = transport_data->get_io_transport(hostAddress, (transport_data->http_proxy_hostname == NULL) ? NULL : &mqtt_proxy_options);
2123 if (transport_data->xioTransport == NULL)
2124 {
2125 LogError("Unable to create the lower level TLS layer.");
2126 result = MU_FAILURE;
2127 }
2128 else
2129 {
2130 if (transport_data->saved_tls_options != NULL)
2131 {
2132 if (OptionHandler_FeedOptions(transport_data->saved_tls_options, transport_data->xioTransport) != OPTIONHANDLER_OK)
2133 {
2134 LogError("Failed feeding existing options to new TLS instance.");
2135 result = MU_FAILURE;
2136 }
2137 else
2138 {
2139 // The tlsio has the options, so our copy can be deleted
2140 set_saved_tls_options(transport_data, NULL);
2141 result = 0;
2142 }
2143 }
2144 else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_X509_ECC)
2145 {
2146 if (IoTHubClient_Auth_Set_xio_Certificate(transport_data->authorization_module, transport_data->xioTransport) != 0)
2147 {
2148 LogError("Unable to create the lower level TLS layer.");
2149 result = MU_FAILURE;
2150 }
2151 else
2152 {
2153 result = 0;
2154 }
2155 }
2156 else
2157 {
2158 result = 0;
2159 }
2160 }
2161 }
2162 else
2163 {
2164 result = 0;
2165 }
2166 return result;
2167}
2168
2169static STRING_HANDLE buildClientId(const char* device_id, const char* module_id)
2170{
2171 if (module_id == NULL)
2172 {
2173 return STRING_construct_sprintf("%s", device_id);
2174 }
2175 else
2176 {
2177 return STRING_construct_sprintf("%s/%s", device_id, module_id);
2178 }
2179}
2180
2181static int buildConfigForUsernameStep2IfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2182{
2183 int result;
2184
2185 if (!transport_data->isConnectUsernameSet)
2186 {
2187 STRING_HANDLE userName = NULL;
2188 STRING_HANDLE modelIdParameter = NULL;
2189 STRING_HANDLE urlEncodedModelId = NULL;
2190 const char* modelId = transport_data->transport_callbacks.get_model_id_cb(transport_data->transport_ctx);
2191 // TODO: The preview API version in SDK is only scoped to scenarios that require the modelId to be set.
2192 // https://github.com/Azure/azure-iot-sdk-c/issues/1547 tracks removing this once non-preview API versions support modelId.
2193 const char* apiVersion = (modelId != NULL) ? IOTHUB_API_PREVIEW_VERSION : IOTHUB_API_VERSION;
2194 const char* appSpecifiedProductInfo = transport_data->transport_callbacks.prod_info_cb(transport_data->transport_ctx);
2195 STRING_HANDLE productInfoEncoded = NULL;
2196
2197 if ((productInfoEncoded = URL_EncodeString((appSpecifiedProductInfo != NULL) ? appSpecifiedProductInfo : DEFAULT_IOTHUB_PRODUCT_IDENTIFIER)) == NULL)
2198 {
2199 LogError("Unable to UrlEncode productInfo");
2200 result = MU_FAILURE;
2201 }
2202 else if ((userName = STRING_construct_sprintf("%s?api-version=%s&DeviceClientType=%s", STRING_c_str(transport_data->configPassedThroughUsername), apiVersion, STRING_c_str(productInfoEncoded))) == NULL)
2203 {
2204 LogError("Failed constructing string");
2205 result = 0;
2206 }
2207 else if (modelId != NULL)
2208 {
2209 if ((urlEncodedModelId = URL_EncodeString(modelId)) == NULL)
2210 {
2211 LogError("Failed to URL encode the modelID string");
2212 result = MU_FAILURE;
2213 }
2214 else if ((modelIdParameter = STRING_construct_sprintf("&%s=%s", DT_MODEL_ID_TOKEN, STRING_c_str(urlEncodedModelId))) == NULL)
2215 {
2216 LogError("Cannot build modelID string");
2217 result = MU_FAILURE;
2218 }
2219 else if (STRING_concat_with_STRING(userName, modelIdParameter) != 0)
2220 {
2221 LogError("Failed to set modelID parameter in connect");
2222 result = MU_FAILURE;
2223 }
2224 else
2225 {
2226 result = 0;
2227 }
2228 }
2229 else
2230 {
2231 result = 0;
2232 }
2233
2234 if (result == 0)
2235 {
2236 STRING_delete(transport_data->configPassedThroughUsername);
2237 transport_data->configPassedThroughUsername = userName;
2238 userName = NULL;
2239 // setting connect string is only allowed once in the lifetime of the device client.
2240 transport_data->isConnectUsernameSet = true;
2241 }
2242
2243 STRING_delete(userName);
2244 STRING_delete(modelIdParameter);
2245 STRING_delete(urlEncodedModelId);
2246 STRING_delete(productInfoEncoded);
2247 }
2248 else
2249 {
2250 result = 0;
2251 }
2252
2253 return result;
2254}
2255
2256static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2257{
2258 int result;
2259
2260 char* sasToken = NULL;
2261 result = 0;
2262
2263 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
2264 if (cred_type == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY || cred_type == IOTHUB_CREDENTIAL_TYPE_DEVICE_AUTH)
2265 {
2266 sasToken = IoTHubClient_Auth_Get_SasToken(transport_data->authorization_module, STRING_c_str(transport_data->devicesAndModulesPath), 0, NULL);
2267 if (sasToken == NULL)
2268 {
2269 LogError("failure getting sas token from IoTHubClient_Auth_Get_SasToken.");
2270 result = MU_FAILURE;
2271 }
2272 }
2273 else if (cred_type == IOTHUB_CREDENTIAL_TYPE_SAS_TOKEN)
2274 {
2275 SAS_TOKEN_STATUS token_status = IoTHubClient_Auth_Is_SasToken_Valid(transport_data->authorization_module);
2276 if (token_status == SAS_TOKEN_STATUS_INVALID)
2277 {
2278 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
2279 result = MU_FAILURE;
2280 }
2281 else if (token_status == SAS_TOKEN_STATUS_FAILED)
2282 {
2283 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
2284 result = MU_FAILURE;
2285 }
2286 else
2287 {
2288 sasToken = IoTHubClient_Auth_Get_SasToken(transport_data->authorization_module, NULL, 0, NULL);
2289 if (sasToken == NULL)
2290 {
2291 LogError("failure getting sas Token.");
2292 result = MU_FAILURE;
2293 }
2294 }
2295 }
2296
2297 if (result == 0)
2298 {
2299 STRING_HANDLE clientId;
2300 if (buildConfigForUsernameStep2IfNeeded(transport_data) != 0)
2301 {
2302 LogError("Failed to add optional connect parameters.");
2303 result = MU_FAILURE;
2304 }
2305 else if ((clientId = buildClientId(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id))) == NULL)
2306 {
2307 LogError("Unable to allocate clientId");
2308 result = MU_FAILURE;
2309 }
2310 else
2311 {
2312 MQTT_CLIENT_OPTIONS options = { 0 };
2313 options.clientId = (char*)STRING_c_str(clientId);
2314 options.willMessage = NULL;
2315 options.username = (char*)STRING_c_str(transport_data->configPassedThroughUsername);
2316 if (sasToken != NULL)
2317 {
2318 options.password = sasToken;
2319 }
2320 options.keepAliveInterval = transport_data->keepAliveValue;
2321 options.useCleanSession = false;
2322 options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE;
2323
2324 if (GetTransportProviderIfNecessary(transport_data) == 0)
2325 {
2326 transport_data->conn_attempted = true;
2327 if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0)
2328 {
2329 LogError("failure connecting to address %s.", STRING_c_str(transport_data->hostAddress));
2330 result = MU_FAILURE;
2331 }
2332 else
2333 {
2334 transport_data->currPacketState = CONNECT_TYPE;
2335 transport_data->isRetryExpiredCallbackSet = false;
2336 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time);
2337 result = 0;
2338 }
2339 }
2340 else
2341 {
2342 result = MU_FAILURE;
2343 }
2344
2345 if (sasToken != NULL)
2346 {
2347 free(sasToken);
2348 }
2349 STRING_delete(clientId);
2350 }
2351 }
2352 return result;
2353}
2354
2355static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2356{
2357 int result = 0;
2358
2359 // Make sure we're not destroying the object
2360 if (!transport_data->isDestroyCalled)
2361 {
2362 RETRY_ACTION retry_action = RETRY_ACTION_RETRY_LATER;
2363
2364 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_007: [ IoTHubTransport_MQTT_Common_DoWork shall try to reconnect according to the current retry policy set ]
2365 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError)
2366 {
2367 // Note: in case retry_control_should_retry fails, the reconnection shall be attempted anyway (defaulting to policy IOTHUB_CLIENT_RETRY_IMMEDIATE).
2368 if (!transport_data->conn_attempted || retry_control_should_retry(transport_data->retry_control_handle, &retry_action) != 0 || retry_action == RETRY_ACTION_RETRY_NOW)
2369 {
2370 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
2371 {
2372 transport_data->connectFailCount++;
2373 result = MU_FAILURE;
2374 }
2375 else
2376 {
2377 ResetConnectionIfNecessary(transport_data);
2378
2379 if (SendMqttConnectMsg(transport_data) != 0)
2380 {
2381 transport_data->connectFailCount++;
2382 result = MU_FAILURE;
2383 }
2384 else
2385 {
2386 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTING;
2387 transport_data->connectFailCount = 0;
2388 result = 0;
2389 }
2390 }
2391 }
2392 else if (retry_action == RETRY_ACTION_STOP_RETRYING)
2393 {
2394 // Set callback if retry expired
2395 if (!transport_data->isRetryExpiredCallbackSet)
2396 {
2397 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
2398 transport_data->isRetryExpiredCallbackSet = true;
2399 }
2400 result = MU_FAILURE;
2401 }
2402 else if (retry_action == RETRY_ACTION_RETRY_LATER)
2403 {
2404 result = MU_FAILURE;
2405 }
2406 }
2407 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT)
2408 {
2409 // Need to disconnect from client
2410 DisconnectFromClient(transport_data);
2411 result = 0;
2412 }
2413 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_001: [ IoTHubTransport_MQTT_Common_DoWork shall trigger reconnection if the mqtt_client_connect does not complete within `keepalive` seconds]
2414 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTING)
2415 {
2416 tickcounter_ms_t current_time;
2417 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
2418 {
2419 LogError("failed verifying MQTT_CLIENT_STATUS_CONNECTING timeout");
2420 result = MU_FAILURE;
2421 }
2422 else if ((current_time - transport_data->mqtt_connect_time) / 1000 > transport_data->connect_timeout_in_sec)
2423 {
2424 LogError("mqtt_client timed out waiting for CONNACK");
2425 transport_data->currPacketState = PACKET_TYPE_ERROR;
2426 DisconnectFromClient(transport_data);
2427 result = MU_FAILURE;
2428 }
2429 }
2430 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
2431 {
2432 // We are connected and not being closed, so does SAS need to reconnect?
2433 tickcounter_ms_t current_time;
2434 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
2435 {
2436 transport_data->connectFailCount++;
2437 result = MU_FAILURE;
2438 }
2439 else
2440 {
2441 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
2442 // If the credential type is not an x509 certificate then we shall renew the Sas_Token
2443 if (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_X509_ECC)
2444 {
2445 size_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);
2446 if ((current_time - transport_data->mqtt_connect_time) / 1000 > (sas_token_expiry*SAS_REFRESH_MULTIPLIER))
2447 {
2448 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_058: [ If the sas token has timed out IoTHubTransport_MQTT_Common_DoWork shall disconnect from the mqtt client and destroy the transport information and wait for reconnect. ] */
2449 DisconnectFromClient(transport_data);
2450
2451 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
2452 transport_data->currPacketState = UNKNOWN_TYPE;
2453 if (transport_data->topic_MqttMessage != NULL)
2454 {
2455 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
2456 }
2457 if (transport_data->topic_GetState != NULL)
2458 {
2459 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2460 }
2461 if (transport_data->topic_NotifyState != NULL)
2462 {
2463 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2464 }
2465 if (transport_data->topic_DeviceMethods != NULL)
2466 {
2467 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2468 }
2469 if (transport_data->topic_InputQueue != NULL)
2470 {
2471 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
2472 }
2473 }
2474 }
2475 }
2476 }
2477 }
2478 return result;
2479}
2480
2481// At handle creation time, we don't have all the fields required for building up the user name (e.g. productID)
2482// We build up as much of the string as we can at this point because we do not store upperConfig after initialization.
2483// In buildConfigForUsernameStep2IfNeeded, only immediately before we do CONNECT itself, do we complete building up this string.
2484static STRING_HANDLE buildConfigForUsernameStep1(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
2485{
2486 if (moduleId == NULL)
2487 {
2488 return STRING_construct_sprintf("%s.%s/%s/", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId);
2489 }
2490 else
2491 {
2492 return STRING_construct_sprintf("%s.%s/%s/%s/", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, moduleId);
2493 }
2494}
2495
2496static STRING_HANDLE buildMqttEventString(const char* device_id, const char* module_id)
2497{
2498 if (module_id == NULL)
2499 {
2500 return STRING_construct_sprintf(TOPIC_DEVICE_DEVICE, device_id);
2501 }
2502 else
2503 {
2504 return STRING_construct_sprintf(TOPIC_DEVICE_DEVICE_MODULE, device_id, module_id);
2505 }
2506}
2507
2508static STRING_HANDLE buildDevicesAndModulesPath(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
2509{
2510 if (moduleId == NULL)
2511 {
2512 return STRING_construct_sprintf("%s.%s/devices/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId);
2513 }
2514 else
2515 {
2516 return STRING_construct_sprintf("%s.%s/devices/%s/modules/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, moduleId);
2517 }
2518}
2519
2520static PMQTTTRANSPORT_HANDLE_DATA InitializeTransportHandleData(const IOTHUB_CLIENT_CONFIG* upperConfig, PDLIST_ENTRY waitingToSend, IOTHUB_AUTHORIZATION_HANDLE auth_module, const char* moduleId)
2521{
2522 PMQTTTRANSPORT_HANDLE_DATA state = (PMQTTTRANSPORT_HANDLE_DATA)malloc(sizeof(MQTTTRANSPORT_HANDLE_DATA));
2523 if (state == NULL)
2524 {
2525 LogError("Could not create MQTT transport state. Memory allocation failed.");
2526 }
2527 else
2528 {
2529 memset(state, 0, sizeof(MQTTTRANSPORT_HANDLE_DATA));
2530 if ((state->msgTickCounter = tickcounter_create()) == NULL)
2531 {
2532 LogError("Invalid Argument: iotHubName is empty");
2533 free_transport_handle_data(state);
2534 state = NULL;
2535 }
2536 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_005: [ MQTT transport shall use EXPONENTIAL_WITH_BACK_OFF as default retry policy ]
2537 else if ((state->retry_control_handle = retry_control_create(DEFAULT_RETRY_POLICY, DEFAULT_RETRY_TIMEOUT_IN_SECONDS)) == NULL)
2538 {
2539 LogError("Failed creating default retry control");
2540 free_transport_handle_data(state);
2541 state = NULL;
2542 }
2543 else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL)
2544 {
2545 LogError("failure constructing device_id.");
2546 free_transport_handle_data(state);
2547 state = NULL;
2548 }
2549 else if ((moduleId != NULL) && ((state->module_id = STRING_construct(moduleId)) == NULL))
2550 {
2551 LogError("failure constructing module_id.");
2552 free_transport_handle_data(state);
2553 state = NULL;
2554 }
2555 else if ((state->devicesAndModulesPath = buildDevicesAndModulesPath(upperConfig, moduleId)) == NULL)
2556 {
2557 LogError("failure constructing devicesPath.");
2558 free_transport_handle_data(state);
2559 state = NULL;
2560 }
2561 else
2562 {
2563 if ((state->topic_MqttEvent = buildMqttEventString(upperConfig->deviceId, moduleId)) == NULL)
2564 {
2565 LogError("Could not create topic_MqttEvent for MQTT");
2566 free_transport_handle_data(state);
2567 state = NULL;
2568 }
2569 else
2570 {
2571 state->mqttClient = mqtt_client_init(mqtt_notification_callback, mqtt_operation_complete_callback, state, mqtt_error_callback, state);
2572 if (state->mqttClient == NULL)
2573 {
2574 LogError("failure initializing mqtt client.");
2575 free_transport_handle_data(state);
2576 state = NULL;
2577 }
2578 else
2579 {
2580 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_008: [If the upperConfig contains a valid protocolGatewayHostName value this shall be used for the hostname, otherwise the hostname shall be constructed using the iothubName and iothubSuffix.] */
2581 if (upperConfig->protocolGatewayHostName == NULL)
2582 {
2583 state->hostAddress = STRING_construct_sprintf("%s.%s", upperConfig->iotHubName, upperConfig->iotHubSuffix);
2584 }
2585 else
2586 {
2587 state->hostAddress = STRING_construct(upperConfig->protocolGatewayHostName);
2588 }
2589
2590 if (state->hostAddress == NULL)
2591 {
2592 LogError("failure constructing host address.");
2593 free_transport_handle_data(state);
2594 state = NULL;
2595 }
2596 else if ((state->configPassedThroughUsername = buildConfigForUsernameStep1(upperConfig, moduleId)) == NULL)
2597 {
2598 free_transport_handle_data(state);
2599 state = NULL;
2600 }
2601 else
2602 {
2603 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_010: [IoTHubTransport_MQTT_Common_Create shall allocate memory to save its internal state where all topics, hostname, device_id, device_key, sasTokenSr and client handle shall be saved.] */
2604 DList_InitializeListHead(&(state->telemetry_waitingForAck));
2605 DList_InitializeListHead(&(state->ack_waiting_queue));
2606 DList_InitializeListHead(&(state->pending_get_twin_queue));
2607 state->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
2608 state->isRecoverableError = true;
2609 state->packetId = 1;
2610 state->waitingToSend = waitingToSend;
2611 state->currPacketState = CONNECT_TYPE;
2612 state->keepAliveValue = DEFAULT_MQTT_KEEPALIVE;
2613 state->connect_timeout_in_sec = DEFAULT_CONNACK_TIMEOUT;
2614 state->topics_ToSubscribe = UNSUBSCRIBE_FROM_TOPIC;
2615 srand((unsigned int)get_time(NULL));
2616 state->authorization_module = auth_module;
2617
2618 state->isDestroyCalled = false;
2619 state->isRetryExpiredCallbackSet = false;
2620 state->isRegistered = false;
2621 state->device_twin_get_sent = false;
2622 state->xioTransport = NULL;
2623 state->portNum = 0;
2624 state->connectFailCount = 0;
2625 state->connectTick = 0;
2626 state->topic_MqttMessage = NULL;
2627 state->topic_GetState = NULL;
2628 state->topic_NotifyState = NULL;
2629 state->topic_DeviceMethods = NULL;
2630 state->topic_InputQueue = NULL;
2631 state->log_trace = state->raw_trace = false;
2632 state->isConnectUsernameSet = false;
2633 state->auto_url_encode_decode = false;
2634 state->conn_attempted = false;
2635 }
2636 }
2637 }
2638 }
2639 }
2640 return state;
2641}
2642
2643TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Create(const IOTHUBTRANSPORT_CONFIG* config, MQTT_GET_IO_TRANSPORT get_io_transport, TRANSPORT_CALLBACKS_INFO* cb_info, void* ctx)
2644{
2645 PMQTTTRANSPORT_HANDLE_DATA result;
2646 size_t deviceIdSize;
2647
2648 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_001: [If parameter config is NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2649 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ if get_io_transport is NULL then IoTHubTransport_MQTT_Common_Create shall return NULL. ] */
2650 if (config == NULL || get_io_transport == NULL || cb_info == NULL)
2651 {
2652 LogError("Invalid Argument config: %p, get_io_transport: %p, cb_info: %p", config, get_io_transport, cb_info);
2653 result = NULL;
2654 }
2655 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig, auth_module_handle or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2656 else if (config->auth_module_handle == NULL)
2657 {
2658 LogError("Invalid Argument: auth_module_handle is NULL)");
2659 result = NULL;
2660 }
2661 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2662 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2663 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_003: [If both deviceKey & deviceSasToken fields are NOT NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2664 else if (config->upperConfig == NULL ||
2665 config->upperConfig->protocol == NULL ||
2666 config->upperConfig->deviceId == NULL ||
2667 ((config->upperConfig->deviceKey != NULL) && (config->upperConfig->deviceSasToken != NULL)) ||
2668 config->upperConfig->iotHubName == NULL ||
2669 config->upperConfig->iotHubSuffix == NULL)
2670 {
2671 LogError("Invalid Argument: upperConfig structure contains an invalid parameter");
2672 result = NULL;
2673 }
2674 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig, auth_module_handle or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2675 else if (config->waitingToSend == NULL)
2676 {
2677 LogError("Invalid Argument: waitingToSend is NULL)");
2678 result = NULL;
2679 }
2680 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_006: [If the upperConfig's variables deviceId is an empty strings or length is greater then 128 then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2681 else if (((deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0))
2682 {
2683 LogError("Invalid Argument: DeviceId is of an invalid size");
2684 result = NULL;
2685 }
2686 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2687 else if ((config->upperConfig->deviceKey != NULL) && (strlen(config->upperConfig->deviceKey) == 0))
2688 {
2689 LogError("Invalid Argument: deviceKey is empty");
2690 result = NULL;
2691 }
2692 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2693 else if ((config->upperConfig->deviceSasToken != NULL) && (strlen(config->upperConfig->deviceSasToken) == 0))
2694 {
2695 LogError("Invalid Argument: deviceSasToken is empty");
2696 result = NULL;
2697 }
2698 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2699 else if (strlen(config->upperConfig->iotHubName) == 0)
2700 {
2701 LogError("Invalid Argument: iotHubName is empty");
2702 result = NULL;
2703 }
2704 else if (IoTHub_Transport_ValidateCallbacks(cb_info) != 0)
2705 {
2706 LogError("failure checking transport callbacks");
2707 result = NULL;
2708 }
2709 else
2710 {
2711 result = InitializeTransportHandleData(config->upperConfig, config->waitingToSend, config->auth_module_handle, config->moduleId);
2712 if (result != NULL)
2713 {
2714 result->get_io_transport = get_io_transport;
2715 result->http_proxy_hostname = NULL;
2716 result->http_proxy_username = NULL;
2717 result->http_proxy_password = NULL;
2718
2719 result->transport_ctx = ctx;
2720 memcpy(&result->transport_callbacks, cb_info, sizeof(TRANSPORT_CALLBACKS_INFO));
2721 }
2722 }
2723 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_009: [If any error is encountered then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2724 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_011: [On Success IoTHubTransport_MQTT_Common_Create shall return a non-NULL value.] */
2725 return result;
2726}
2727
2728void IoTHubTransport_MQTT_Common_Destroy(TRANSPORT_LL_HANDLE handle)
2729{
2730 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_012: [IoTHubTransport_MQTT_Common_Destroy shall do nothing if parameter handle is NULL.] */
2731 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2732 if (transport_data != NULL)
2733 {
2734 transport_data->isDestroyCalled = true;
2735
2736 DisconnectFromClient(transport_data);
2737
2738 //Empty the Waiting for Ack Messages.
2739 while (!DList_IsListEmpty(&transport_data->telemetry_waitingForAck))
2740 {
2741 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->telemetry_waitingForAck);
2742 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
2743 sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY);
2744 free(mqttMsgEntry);
2745 }
2746 while (!DList_IsListEmpty(&transport_data->ack_waiting_queue))
2747 {
2748 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->ack_waiting_queue);
2749 MQTT_DEVICE_TWIN_ITEM* mqtt_device_twin = containingRecord(currentEntry, MQTT_DEVICE_TWIN_ITEM, entry);
2750
2751 if (mqtt_device_twin->userCallback == NULL)
2752 {
2753 transport_data->transport_callbacks.twin_rpt_state_complete_cb(mqtt_device_twin->iothub_msg_id, STATUS_CODE_TIMEOUT_VALUE, transport_data->transport_ctx);
2754 }
2755 else
2756 {
2757 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
2758 }
2759
2760 destroy_device_twin_get_message(mqtt_device_twin);
2761 }
2762 while (!DList_IsListEmpty(&transport_data->pending_get_twin_queue))
2763 {
2764 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->pending_get_twin_queue);
2765
2766 MQTT_DEVICE_TWIN_ITEM* mqtt_device_twin = containingRecord(currentEntry, MQTT_DEVICE_TWIN_ITEM, entry);
2767
2768 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
2769
2770 destroy_device_twin_get_message(mqtt_device_twin);
2771 }
2772
2773 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_014: [IoTHubTransport_MQTT_Common_Destroy shall free all the resources currently in use.] */
2774 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/
2775 free_transport_handle_data(transport_data);
2776 }
2777}
2778
2779int IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin(TRANSPORT_LL_HANDLE handle)
2780{
2781 int result;
2782 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2783 if (transport_data == NULL)
2784 {
2785 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_042: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
2786 LogError("Invalid handle parameter. NULL.");
2787 result = MU_FAILURE;
2788 }
2789 else
2790 {
2791 if (transport_data->topic_GetState == NULL)
2792 {
2793 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_044: [IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall construct the get state topic string and the notify state topic string.] */
2794 transport_data->topic_GetState = STRING_construct(TOPIC_GET_DESIRED_STATE);
2795 if (transport_data->topic_GetState == NULL)
2796 {
2797 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_046: [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return a non-zero value.] */
2798 LogError("Failure: unable constructing reported state topic");
2799 result = MU_FAILURE;
2800 }
2801 else
2802 {
2803 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_047: [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return 0.] */
2804 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2805 result = 0;
2806 }
2807 }
2808 else
2809 {
2810 result = 0;
2811 }
2812 if (result == 0)
2813 {
2814 changeStateToSubscribeIfAllowed(transport_data);
2815 }
2816 }
2817 return result;
2818}
2819
2820void IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin(TRANSPORT_LL_HANDLE handle)
2821{
2822 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2823 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_048: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall do nothing.] */
2824 if (transport_data != NULL)
2825 {
2826 if (transport_data->topic_GetState != NULL)
2827 {
2828 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_049: [If subscribe_state is set to IOTHUB_DEVICE_TWIN_DESIRED_STATE then IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall unsubscribe from the topic_GetState to the mqtt client.] */
2829 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2830 STRING_delete(transport_data->topic_GetState);
2831 transport_data->topic_GetState = NULL;
2832 }
2833 if (transport_data->topic_NotifyState != NULL)
2834 {
2835 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_050: [If subscribe_state is set to IOTHUB_DEVICE_TWIN_NOTIFICATION_STATE then IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall unsubscribe from the topic_NotifyState to the mqtt client.] */
2836 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2837 STRING_delete(transport_data->topic_NotifyState);
2838 transport_data->topic_NotifyState = NULL;
2839 }
2840 }
2841 else
2842 {
2843 LogError("Invalid argument to unsubscribe (handle is NULL).");
2844 }
2845}
2846
2847IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetTwinAsync(IOTHUB_DEVICE_HANDLE handle, IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK completionCallback, void* callbackContext)
2848{
2849 IOTHUB_CLIENT_RESULT result;
2850
2851 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_001: [ If `handle` or `completionCallback` are `NULL` than `IoTHubTransport_MQTT_Common_GetTwinAsync` shall return IOTHUB_CLIENT_INVALID_ARG. ]
2852 if (handle == NULL || completionCallback == NULL)
2853 {
2854 LogError("Invalid argument (handle=%p, completionCallback=%p)", handle, completionCallback);
2855 result = IOTHUB_CLIENT_INVALID_ARG;
2856 }
2857 else
2858 {
2859 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2860 MQTT_DEVICE_TWIN_ITEM* mqtt_info;
2861
2862 if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
2863 {
2864 LogError("Failed creating the device twin get request message");
2865 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
2866 result = IOTHUB_CLIENT_ERROR;
2867 }
2868 else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgCreationTime) != 0)
2869 {
2870 LogError("Failed setting the get twin request enqueue time");
2871 destroy_device_twin_get_message(mqtt_info);
2872 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
2873 result = IOTHUB_CLIENT_ERROR;
2874 }
2875 else
2876 {
2877 mqtt_info->userCallback = completionCallback;
2878 mqtt_info->userContext = callbackContext;
2879
2880 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_002: [ The request shall be queued to be sent when the transport is connected, through DoWork ]
2881 DList_InsertTailList(&transport_data->pending_get_twin_queue, &mqtt_info->entry);
2882
2883 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_004: [ If no failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_OK ]
2884 result = IOTHUB_CLIENT_OK;
2885 }
2886 }
2887
2888 return result;
2889}
2890
2891int IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod(IOTHUB_DEVICE_HANDLE handle)
2892{
2893 int result;
2894 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2895
2896 if (transport_data == NULL)
2897 {
2898 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_001 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
2899 LogError("Invalid handle parameter. NULL.");
2900 result = MU_FAILURE;
2901 }
2902 else
2903 {
2904 if (transport_data->topic_DeviceMethods == NULL)
2905 {
2906 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_004 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall construct the DEVICE_METHOD topic string for subscribe.]*/
2907 transport_data->topic_DeviceMethods = STRING_construct(TOPIC_DEVICE_METHOD_SUBSCRIBE);
2908 if (transport_data->topic_DeviceMethods == NULL)
2909 {
2910 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_006 : [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
2911 LogError("Failure: unable constructing device method subscribe topic");
2912 result = MU_FAILURE;
2913 }
2914 else
2915 {
2916 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_003 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall set the signaling flag for DEVICE_METHOD topic for the receiver's topic list. ]*/
2917 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2918 result = 0;
2919 }
2920 }
2921 else
2922 {
2923 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_002 : [If the MQTT transport has been previously subscribed to DEVICE_METHOD topic IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall do nothing and return 0.]*/
2924 result = 0;
2925 }
2926
2927 if (result == 0)
2928 {
2929 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_005 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall schedule the send of the subscription.]*/
2930 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_007 : [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return 0.]*/
2931 changeStateToSubscribeIfAllowed(transport_data);
2932 }
2933 }
2934 return result;
2935}
2936
2937void IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod(TRANSPORT_LL_HANDLE handle)
2938{
2939 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2940 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_008 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall do nothing and return.]*/
2941 if (transport_data != NULL)
2942 {
2943 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_009 : [If the MQTT transport has not been subscribed to DEVICE_METHOD topic IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall do nothing and return.]*/
2944 if (transport_data->topic_DeviceMethods != NULL)
2945 {
2946 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_010 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall construct the DEVICE_METHOD topic string for unsubscribe.]*/
2947 const char* unsubscribe[1];
2948 unsubscribe[0] = STRING_c_str(transport_data->topic_DeviceMethods);
2949
2950 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_011 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall send the unsubscribe.]*/
2951 if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
2952 {
2953 LogError("Failure calling mqtt_client_unsubscribe");
2954 }
2955
2956 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_012 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall removes the signaling flag for DEVICE_METHOD topic from the receiver's topic list. ]*/
2957 STRING_delete(transport_data->topic_DeviceMethods);
2958 transport_data->topic_DeviceMethods = NULL;
2959 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_DEVICE_METHOD_TOPIC;
2960 }
2961 }
2962 else
2963 {
2964 LogError("Invalid argument to unsubscribe (NULL).");
2965 }
2966}
2967
2968int IoTHubTransport_MQTT_Common_DeviceMethod_Response(TRANSPORT_LL_HANDLE handle, METHOD_HANDLE methodId, const unsigned char* response, size_t respSize, int status)
2969{
2970 int result;
2971 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
2972 if (transport_data != NULL)
2973 {
2974 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_042: [ IoTHubTransport_MQTT_Common_DeviceMethod_Response shall publish an mqtt message for the device method response. ] */
2975 DEVICE_METHOD_INFO* dev_method_info = (DEVICE_METHOD_INFO*)methodId;
2976 if (dev_method_info == NULL)
2977 {
2978 LogError("Failure: DEVICE_METHOD_INFO was NULL");
2979 result = MU_FAILURE;
2980 }
2981 else
2982 {
2983 if (publish_device_method_message(transport_data, status, dev_method_info->request_id, response, respSize) != 0)
2984 {
2985 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
2986 LogError("Failure: publishing device method response");
2987 result = MU_FAILURE;
2988 }
2989 else
2990 {
2991 result = 0;
2992 }
2993 STRING_delete(dev_method_info->request_id);
2994 free(dev_method_info);
2995 }
2996 }
2997 else
2998 {
2999 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
3000 result = MU_FAILURE;
3001 LogError("Failure: invalid TRANSPORT_LL_HANDLE parameter specified");
3002 }
3003 return result;
3004}
3005
3006static STRING_HANDLE buildTopicMqttMessage(const char* device_id, const char* module_id)
3007{
3008 if (module_id == NULL)
3009 {
3010 return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id);
3011 }
3012 else
3013 {
3014 return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id);
3015 }
3016}
3017
3018int IoTHubTransport_MQTT_Common_Subscribe(TRANSPORT_LL_HANDLE handle)
3019{
3020 int result;
3021 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3022 if (transport_data == NULL)
3023 {
3024 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
3025 LogError("Invalid handle parameter. NULL.");
3026 result = MU_FAILURE;
3027 }
3028 else
3029 {
3030 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the Message Topic.] */
3031 transport_data->topic_MqttMessage = buildTopicMqttMessage(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
3032 if (transport_data->topic_MqttMessage == NULL)
3033 {
3034 LogError("Failure constructing Message Topic");
3035 result = MU_FAILURE;
3036 }
3037 else
3038 {
3039 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
3040 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_035: [If current packet state is not CONNACT, DISCONNECT_TYPE, or PACKET_TYPE_ERROR then IoTHubTransport_MQTT_Common_Subscribe shall set the packet state to SUBSCRIBE_TYPE.]*/
3041 changeStateToSubscribeIfAllowed(transport_data);
3042 result = 0;
3043 }
3044 }
3045 return result;
3046}
3047
3048void IoTHubTransport_MQTT_Common_Unsubscribe(TRANSPORT_LL_HANDLE handle)
3049{
3050 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3051 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_019: [If parameter handle is NULL then IoTHubTransport_MQTT_Common_Unsubscribe shall do nothing.] */
3052 if (transport_data != NULL)
3053 {
3054 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_020: [IoTHubTransport_MQTT_Common_Unsubscribe shall call mqtt_client_unsubscribe to unsubscribe the mqtt message topic.] */
3055 const char* unsubscribe[1];
3056 unsubscribe[0] = STRING_c_str(transport_data->topic_MqttMessage);
3057 if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
3058 {
3059 LogError("Failure calling mqtt_client_unsubscribe");
3060 }
3061 STRING_delete(transport_data->topic_MqttMessage);
3062 transport_data->topic_MqttMessage = NULL;
3063 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_TELEMETRY_TOPIC;
3064 }
3065 else
3066 {
3067 LogError("Invalid argument to unsubscribe (NULL).");
3068 }
3069}
3070
3071IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_MQTT_Common_ProcessItem(TRANSPORT_LL_HANDLE handle, IOTHUB_IDENTITY_TYPE item_type, IOTHUB_IDENTITY_INFO* iothub_item)
3072{
3073 IOTHUB_PROCESS_ITEM_RESULT result;
3074 /* Codes_SRS_IOTHUBCLIENT_LL_07_001: [ If handle or iothub_item are NULL then IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3075 if (handle == NULL || iothub_item == NULL)
3076 {
3077 LogError("Invalid handle parameter iothub_item=%p", iothub_item);
3078 result = IOTHUB_PROCESS_ERROR;
3079 }
3080 else
3081 {
3082 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3083
3084 if (transport_data->currPacketState == PUBLISH_TYPE)
3085 {
3086 // Ensure the reported property suback has been received
3087 if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv)
3088 {
3089 MQTT_DEVICE_TWIN_ITEM* mqtt_info = create_device_twin_message(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id);
3090 if (mqtt_info == NULL)
3091 {
3092 /* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3093 result = IOTHUB_PROCESS_ERROR;
3094 }
3095 else
3096 {
3097 /*Codes_SRS_IOTHUBCLIENT_LL_07_003: [ IoTHubTransport_MQTT_Common_ProcessItem shall publish a message to the mqtt protocol with the message topic for the message type.]*/
3098 /* Codes_SRS_IOTHUBCLIENT_LL_07_005: [ If successful IoTHubTransport_MQTT_Common_ProcessItem shall add mqtt info structure acknowledgement queue. ] */
3099 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
3100
3101 if (publish_device_twin_message(transport_data, iothub_item->device_twin, mqtt_info) != 0)
3102 {
3103 DList_RemoveEntryList(&mqtt_info->entry);
3104
3105 free(mqtt_info);
3106 /* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3107 result = IOTHUB_PROCESS_ERROR;
3108 }
3109 else
3110 {
3111 result = IOTHUB_PROCESS_OK;
3112 }
3113 }
3114 }
3115 else
3116 {
3117 /* Codes_SRS_IOTHUBCLIENT_LL_07_006: [ If the item_type is not a supported type IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_CONTINUE. ]*/
3118 result = IOTHUB_PROCESS_CONTINUE;
3119 }
3120 }
3121 else
3122 {
3123 /* Codes_SRS_IOTHUBCLIENT_LL_07_002: [ If the mqtt is not ready to publish messages IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_NOT_CONNECTED. ] */
3124 result = IOTHUB_PROCESS_NOT_CONNECTED;
3125 }
3126 }
3127 return result;
3128}
3129
3130/* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ IoTHubTransport_MQTT_Common_DoWork shall subscribe to the Notification and get_state Topics if they are defined. ] */
3131void IoTHubTransport_MQTT_Common_DoWork(TRANSPORT_LL_HANDLE handle)
3132{
3133 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_026: [IoTHubTransport_MQTT_Common_DoWork shall do nothing if parameter handle and/or iotHubClientHandle is NULL.] */
3134 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3135 if (transport_data != NULL)
3136 {
3137 if (InitializeConnection(transport_data) == 0)
3138 {
3139 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_PENDING_CLOSE)
3140 {
3141 mqtt_client_disconnect(transport_data->mqttClient, NULL, NULL);
3142 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
3143 }
3144 else if (transport_data->currPacketState == CONNACK_TYPE || transport_data->currPacketState == SUBSCRIBE_TYPE)
3145 {
3146 SubscribeToMqttProtocol(transport_data);
3147 }
3148 else if (transport_data->currPacketState == SUBACK_TYPE)
3149 {
3150 if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) &&
3151 !transport_data->device_twin_get_sent)
3152 {
3153 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */
3154 MQTT_DEVICE_TWIN_ITEM* mqtt_info;
3155
3156 if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
3157 {
3158 LogError("Failure: could not create message for twin get command");
3159 }
3160 else if (publish_device_twin_get_message(transport_data, mqtt_info) == 0)
3161 {
3162 transport_data->device_twin_get_sent = true;
3163 }
3164 else
3165 {
3166 LogError("Failure: sending device twin get property command.");
3167 destroy_device_twin_get_message(mqtt_info);
3168 }
3169 }
3170 // Publish can be called now
3171 transport_data->currPacketState = PUBLISH_TYPE;
3172 }
3173 else if (transport_data->currPacketState == PUBLISH_TYPE)
3174 {
3175 PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink;
3176 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
3177 while (currentListEntry != transport_data->waitingToSend)
3178 {
3179 IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry);
3180 DLIST_ENTRY savedFromCurrentListEntry;
3181 savedFromCurrentListEntry.Flink = currentListEntry->Flink;
3182
3183 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
3184 size_t messageLength;
3185 const unsigned char* messagePayload = NULL;
3186 if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength))
3187 {
3188 (void)(DList_RemoveEntryList(currentListEntry));
3189 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
3190 LogError("Failure result from IoTHubMessage_GetData");
3191 }
3192 else
3193 {
3194 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */
3195 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST));
3196 if (mqttMsgEntry == NULL)
3197 {
3198 LogError("Allocation Error: Failure allocating MQTT Message Detail List.");
3199 }
3200 else
3201 {
3202 mqttMsgEntry->retryCount = 0;
3203 mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
3204 mqttMsgEntry->packet_id = get_next_packet_id(transport_data);
3205 if (publish_mqtt_telemetry_msg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0)
3206 {
3207 (void)(DList_RemoveEntryList(currentListEntry));
3208 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
3209 free(mqttMsgEntry);
3210 }
3211 else
3212 {
3213 // Remove the message from the waiting queue ...
3214 (void)(DList_RemoveEntryList(currentListEntry));
3215 // and add it to the ack queue
3216 DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry));
3217 }
3218 }
3219 }
3220 currentListEntry = savedFromCurrentListEntry.Flink;
3221 }
3222
3223 if (transport_data->twin_resp_sub_recv)
3224 {
3225 sendPendingGetTwinRequests(transport_data);
3226 }
3227 }
3228 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */
3229 mqtt_client_dowork(transport_data->mqttClient);
3230 }
3231
3232 // Check the ack messages timeouts
3233 process_queued_ack_messages(transport_data);
3234 removeExpiredTwinRequests(transport_data);
3235 }
3236}
3237
3238IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetSendStatus(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
3239{
3240 IOTHUB_CLIENT_RESULT result;
3241
3242 if (handle == NULL || iotHubClientStatus == NULL)
3243 {
3244 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */
3245 LogError("invalid arument.");
3246 result = IOTHUB_CLIENT_INVALID_ARG;
3247 }
3248 else
3249 {
3250 MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3251 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck)))
3252 {
3253 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */
3254 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
3255 }
3256 else
3257 {
3258 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */
3259 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
3260 }
3261 result = IOTHUB_CLIENT_OK;
3262 }
3263 return result;
3264}
3265
3266IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_SetOption(TRANSPORT_LL_HANDLE handle, const char* option, const void* value)
3267{
3268 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_021: [If any parameter is NULL then IoTHubTransport_MQTT_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
3269 IOTHUB_CLIENT_RESULT result;
3270 if (
3271 (handle == NULL) ||
3272 (option == NULL) ||
3273 (value == NULL)
3274 )
3275 {
3276 result = IOTHUB_CLIENT_INVALID_ARG;
3277 LogError("invalid parameter (NULL) passed to IoTHubTransport_MQTT_Common_SetOption.");
3278 }
3279 else
3280 {
3281 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3282
3283 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
3284
3285 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_031: [If the option parameter is set to "logtrace" then the value shall be a bool_ptr and the value will determine if the mqtt client log is on or off.] */
3286 if (strcmp(OPTION_LOG_TRACE, option) == 0)
3287 {
3288 transport_data->log_trace = *((bool*)value);
3289 mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace);
3290 result = IOTHUB_CLIENT_OK;
3291 }
3292 else if (strcmp("rawlogtrace", option) == 0)
3293 {
3294 transport_data->raw_trace = *((bool*)value);
3295 mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace);
3296 result = IOTHUB_CLIENT_OK;
3297 }
3298 else if (strcmp(OPTION_AUTO_URL_ENCODE_DECODE, option) == 0)
3299 {
3300 transport_data->auto_url_encode_decode = *((bool*)value);
3301 result = IOTHUB_CLIENT_OK;
3302 }
3303 else if (strcmp(OPTION_CONNECTION_TIMEOUT, option) == 0)
3304 {
3305 int* connection_time = (int*)value;
3306 if (*connection_time != transport_data->connect_timeout_in_sec)
3307 {
3308 transport_data->connect_timeout_in_sec = (uint16_t)(*connection_time);
3309 }
3310 result = IOTHUB_CLIENT_OK;
3311 }
3312 else if (strcmp(OPTION_KEEP_ALIVE, option) == 0)
3313 {
3314 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_036: [If the option parameter is set to "keepalive" then the value shall be a int_ptr and the value will determine the mqtt keepalive time that is set for pings.] */
3315 int* keepAliveOption = (int*)value;
3316 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_037 : [If the option parameter is set to supplied int_ptr keepalive is the same value as the existing keepalive then IoTHubTransport_MQTT_Common_SetOption shall do nothing.] */
3317 if (*keepAliveOption != transport_data->keepAliveValue)
3318 {
3319 transport_data->keepAliveValue = (uint16_t)(*keepAliveOption);
3320 if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_NOT_CONNECTED)
3321 {
3322 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */
3323 DisconnectFromClient(transport_data);
3324 }
3325 }
3326 result = IOTHUB_CLIENT_OK;
3327 }
3328 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
3329 else if ((strcmp(OPTION_X509_CERT, option) == 0) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_UNKNOWN))
3330 {
3331 LogError("x509certificate specified, but authentication method is not x509");
3332 result = IOTHUB_CLIENT_INVALID_ARG;
3333 }
3334 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_040: [If the option parameter is set to "x509privatekey" then the value shall be a const char of the RSA Private Key to be used for x509.] */
3335 else if ((strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_UNKNOWN))
3336 {
3337 LogError("x509privatekey specified, but authentication method is not x509");
3338 result = IOTHUB_CLIENT_INVALID_ARG;
3339 }
3340 else if (strcmp(OPTION_RETRY_INTERVAL_SEC, option) == 0)
3341 {
3342 if (retry_control_set_option(transport_data->retry_control_handle, RETRY_CONTROL_OPTION_INITIAL_WAIT_TIME_IN_SECS, value) != 0)
3343 {
3344 LogError("Failure setting retry interval option");
3345 result = IOTHUB_CLIENT_ERROR;
3346 }
3347 else
3348 {
3349 result = IOTHUB_CLIENT_OK;
3350 }
3351 }
3352 else if (strcmp(OPTION_RETRY_MAX_DELAY_SECS, option) == 0)
3353 {
3354 if (retry_control_set_option(transport_data->retry_control_handle, RETRY_CONTROL_OPTION_MAX_DELAY_IN_SECS, value) != 0)
3355 {
3356 LogError("Failure setting retry max delay option");
3357 result = IOTHUB_CLIENT_ERROR;
3358 }
3359 else
3360 {
3361 result = IOTHUB_CLIENT_OK;
3362 }
3363 }
3364 else if (strcmp(OPTION_HTTP_PROXY, option) == 0)
3365 {
3366 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/
3367 HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value;
3368
3369 if (transport_data->xioTransport != NULL)
3370 {
3371 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_007: [ If the underlying IO has already been created, then `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3372 LogError("Cannot set proxy option once the underlying IO is created");
3373 result = IOTHUB_CLIENT_ERROR;
3374 }
3375 else if (proxy_options->host_address == NULL)
3376 {
3377 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_003: [ If `host_address` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
3378 LogError("NULL host_address in proxy options");
3379 result = IOTHUB_CLIENT_INVALID_ARG;
3380 }
3381 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_006: [ If only one of `username` and `password` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
3382 else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) &&
3383 (proxy_options->username != proxy_options->password))
3384 {
3385 LogError("Only one of username and password for proxy settings was NULL");
3386 result = IOTHUB_CLIENT_INVALID_ARG;
3387 }
3388 else
3389 {
3390 char* copied_proxy_hostname = NULL;
3391 char* copied_proxy_username = NULL;
3392 char* copied_proxy_password = NULL;
3393
3394 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_002: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/
3395 transport_data->http_proxy_port = proxy_options->port;
3396 if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0)
3397 {
3398 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3399 LogError("Cannot copy HTTP proxy hostname");
3400 result = IOTHUB_CLIENT_ERROR;
3401 }
3402 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
3403 else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0))
3404 {
3405 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3406 free(copied_proxy_hostname);
3407 LogError("Cannot copy HTTP proxy username");
3408 result = IOTHUB_CLIENT_ERROR;
3409 }
3410 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
3411 else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0))
3412 {
3413 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3414 if (copied_proxy_username != NULL)
3415 {
3416 free(copied_proxy_username);
3417 }
3418 free(copied_proxy_hostname);
3419 LogError("Cannot copy HTTP proxy password");
3420 result = IOTHUB_CLIENT_ERROR;
3421 }
3422 else
3423 {
3424 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/
3425 free_proxy_data(transport_data);
3426
3427 transport_data->http_proxy_hostname = copied_proxy_hostname;
3428 transport_data->http_proxy_username = copied_proxy_username;
3429 transport_data->http_proxy_password = copied_proxy_password;
3430
3431 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_008: [ If setting the `proxy_data` option suceeds, `IoTHubTransport_MQTT_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/
3432 result = IOTHUB_CLIENT_OK;
3433 }
3434 }
3435 }
3436 else
3437 {
3438 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
3439 if (((strcmp(OPTION_X509_CERT, option) == 0) || (strcmp(OPTION_X509_PRIVATE_KEY, option) == 0)) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509))
3440 {
3441 IoTHubClient_Auth_Set_x509_Type(transport_data->authorization_module, true);
3442 }
3443
3444 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */
3445 if (GetTransportProviderIfNecessary(transport_data) == 0)
3446 {
3447 if (xio_setoption(transport_data->xioTransport, option, value) == 0)
3448 {
3449 result = IOTHUB_CLIENT_OK;
3450 }
3451 else
3452 {
3453 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_132: [IoTHubTransport_MQTT_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG xio_setoption fails] */
3454 result = IOTHUB_CLIENT_INVALID_ARG;
3455 }
3456 }
3457 else
3458 {
3459 result = IOTHUB_CLIENT_ERROR;
3460 }
3461 }
3462 }
3463 return result;
3464}
3465
3466static bool check_module_ids_equal(const char* transportModuleId, const char* deviceModuleId)
3467{
3468 if ((transportModuleId != NULL) && (deviceModuleId == NULL))
3469 {
3470 return false;
3471 }
3472 else if ((transportModuleId == NULL) && (deviceModuleId != NULL))
3473 {
3474 return false;
3475 }
3476 else if ((transportModuleId == NULL) && (deviceModuleId == NULL))
3477 {
3478 return true;
3479 }
3480 else
3481 {
3482 return (0 == strcmp(transportModuleId, deviceModuleId));
3483 }
3484}
3485
3486TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Register(TRANSPORT_LL_HANDLE handle, const IOTHUB_DEVICE_CONFIG* device, PDLIST_ENTRY waitingToSend)
3487{
3488 TRANSPORT_LL_HANDLE result = NULL;
3489
3490 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
3491 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_002: [ IoTHubTransport_MQTT_Common_Register shall return NULL if device or waitingToSend are NULL.]
3492 if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL))
3493 {
3494 LogError("IoTHubTransport_MQTT_Common_Register: handle, device or waitingToSend is NULL.");
3495 result = NULL;
3496 }
3497 else
3498 {
3499 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3500
3501 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceId, or both deviceKey and deviceSasToken are NULL.]
3502 if (device->deviceId == NULL)
3503 {
3504 LogError("IoTHubTransport_MQTT_Common_Register: deviceId is NULL.");
3505 result = NULL;
3506 }
3507 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_002: [ IoTHubTransport_MQTT_Common_Register shall return NULL if both deviceKey and deviceSasToken are provided.]
3508 else if ((device->deviceKey != NULL) && (device->deviceSasToken != NULL))
3509 {
3510 LogError("IoTHubTransport_MQTT_Common_Register: Both deviceKey and deviceSasToken are defined. Only one can be used.");
3511 result = NULL;
3512 }
3513 else
3514 {
3515 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_003: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransport_MQTT_Common_Create.]
3516 if (strcmp(STRING_c_str(transport_data->device_id), device->deviceId) != 0)
3517 {
3518 LogError("IoTHubTransport_MQTT_Common_Register: deviceId does not match.");
3519 result = NULL;
3520 }
3521 else if (!check_module_ids_equal(STRING_c_str(transport_data->module_id), device->moduleId))
3522 {
3523 LogError("IoTHubTransport_MQTT_Common_Register: moduleId does not match.");
3524 result = NULL;
3525 }
3526 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_43_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceKey is NULL when credential type is IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY ]
3527 else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY &&
3528 ((device->deviceKey == NULL) || (strcmp(IoTHubClient_Auth_Get_DeviceKey(transport_data->authorization_module), device->deviceKey) != 0)))
3529 {
3530 LogError("IoTHubTransport_MQTT_Common_Register: deviceKey does not match.");
3531 result = NULL;
3532 }
3533 else
3534 {
3535 if (transport_data->isRegistered == true)
3536 {
3537 LogError("Transport already has device registered by id: [%s]", device->deviceId);
3538 result = NULL;
3539 }
3540 else
3541 {
3542 transport_data->isRegistered = true;
3543 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_004: [ IoTHubTransport_MQTT_Common_Register shall return the TRANSPORT_LL_HANDLE as the TRANSPORT_LL_HANDLE. ]
3544 result = (TRANSPORT_LL_HANDLE)handle;
3545 }
3546 }
3547 }
3548 }
3549
3550 return result;
3551}
3552
3553void IoTHubTransport_MQTT_Common_Unregister(TRANSPORT_LL_HANDLE deviceHandle)
3554{
3555 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ If deviceHandle is NULL `IoTHubTransport_MQTT_Common_Unregister` shall do nothing. ]
3556 if (deviceHandle != NULL)
3557 {
3558 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)deviceHandle;
3559
3560 transport_data->isRegistered = false;
3561 }
3562}
3563
3564STRING_HANDLE IoTHubTransport_MQTT_Common_GetHostname(TRANSPORT_LL_HANDLE handle)
3565{
3566 STRING_HANDLE result;
3567 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_02_001: [ If handle is NULL then IoTHubTransport_MQTT_Common_GetHostname shall fail and return NULL. ]*/
3568 if (handle == NULL)
3569 {
3570 result = NULL;
3571 }
3572 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_02_002: [ Otherwise IoTHubTransport_MQTT_Common_GetHostname shall return a non-NULL STRING_HANDLE containg the hostname. ]*/
3573 else if ((result = STRING_clone(((MQTTTRANSPORT_HANDLE_DATA*)(handle))->hostAddress)) == NULL)
3574 {
3575 LogError("Cannot provide the target host name (STRING_clone failed).");
3576 }
3577
3578 return result;
3579}
3580
3581IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_SendMessageDisposition(MESSAGE_CALLBACK_INFO* message_data, IOTHUBMESSAGE_DISPOSITION_RESULT disposition)
3582{
3583 (void)disposition;
3584
3585 IOTHUB_CLIENT_RESULT result;
3586 if (message_data)
3587 {
3588 if (message_data->messageHandle)
3589 {
3590 IoTHubMessage_Destroy(message_data->messageHandle);
3591 result = IOTHUB_CLIENT_OK;
3592 }
3593 else
3594 {
3595 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_10_002: [If any of the messageData fields are NULL, IoTHubTransport_MQTT_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR. ]*/
3596 LogError("message handle is NULL");
3597 result = IOTHUB_CLIENT_ERROR;
3598 }
3599 free(message_data);
3600 }
3601 else
3602 {
3603 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_10_001: [If messageData is NULL, IoTHubTransport_MQTT_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR. ]*/
3604 LogError("message_data is NULL");
3605 result = IOTHUB_CLIENT_ERROR;
3606 }
3607 return result;
3608}
3609
3610
3611int IoTHubTransport_MQTT_Common_Subscribe_InputQueue(TRANSPORT_LL_HANDLE handle)
3612{
3613 int result;
3614 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3615 if (transport_data == NULL)
3616 {
3617 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_066: [ If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return a non-zero value.]
3618 LogError("Invalid handle parameter. NULL.");
3619 result = MU_FAILURE;
3620 }
3621 else if (transport_data->module_id == NULL)
3622 {
3623 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_073: [ If module ID is not set on the transpont, IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall fail.]
3624 LogError("ModuleID must be specified for input queues. NULL.");
3625 result = MU_FAILURE;
3626 }
3627 else if ((transport_data->topic_InputQueue == NULL) &&
3628 (transport_data->topic_InputQueue = STRING_construct_sprintf(TOPIC_INPUT_QUEUE_NAME, STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id))) == NULL)
3629 {
3630 LogError("Failure constructing Message Topic");
3631 result = MU_FAILURE;
3632 }
3633 else
3634 {
3635 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_067: [ IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the input queue Message Topic.]
3636 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
3637 changeStateToSubscribeIfAllowed(transport_data);
3638 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_070: [ On success IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return 0.]
3639 result = 0;
3640 }
3641 return result;
3642}
3643
3644void IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue(TRANSPORT_LL_HANDLE handle)
3645{
3646 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3647 if ((transport_data != NULL) && (transport_data->topic_InputQueue != NULL))
3648 {
3649 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_072: [ IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall call mqtt_client_unsubscribe to unsubscribe the mqtt input queue message topic.]
3650 const char* unsubscribe[1];
3651 unsubscribe[0] = STRING_c_str(transport_data->topic_InputQueue);
3652 if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
3653 {
3654 LogError("Failure calling mqtt_client_unsubscribe");
3655 }
3656 STRING_delete(transport_data->topic_InputQueue);
3657 transport_data->topic_InputQueue = NULL;
3658 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_INPUT_QUEUE_TOPIC;
3659 }
3660 else
3661 {
3662 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_071: [ If parameter handle is NULL then IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall do nothing.]
3663 LogError("Invalid argument to unsubscribe input queue (NULL).");
3664 }
3665}
3666
3667int IoTHubTransport_MQTT_SetCallbackContext(TRANSPORT_LL_HANDLE handle, void* ctx)
3668{
3669 int result;
3670 if (handle == NULL)
3671 {
3672 LogError("Invalid parameter specified handle: %p", handle);
3673 result = MU_FAILURE;
3674 }
3675 else
3676 {
3677 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3678 transport_data->transport_ctx = ctx;
3679 result = 0;
3680 }
3681 return result;
3682}
3683
3684int IoTHubTransport_MQTT_GetSupportedPlatformInfo(TRANSPORT_LL_HANDLE handle, PLATFORM_INFO_OPTION* info)
3685{
3686 int result;
3687
3688 if (handle == NULL || info == NULL)
3689 {
3690 LogError("Invalid parameter specified (handle: %p, info: %p)", handle, info);
3691 result = MU_FAILURE;
3692 }
3693 else
3694 {
3695 *info = PLATFORM_INFO_OPTION_RETRIEVE_SQM;
3696 result = 0;
3697 }
3698
3699 return result;
3700}
Note: See TracBrowser for help on using the repository browser.