source: azure_iot_hub/trunk/azure_iohub/iothub_client/src/iothubtransport_mqtt_common.c@ 388

Last change on this file since 388 was 388, checked in by coas-nagasima, 5 years ago

Azure IoT Hub Device C SDK を使ったサンプルの追加

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
  • Property svn:mime-type set to text/x-csrc
File size: 156.9 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <ctype.h>
6
7#include "azure_c_shared_utility/optimize_size.h"
8#include "azure_c_shared_utility/gballoc.h"
9#include "azure_c_shared_utility/xlogging.h"
10#include "azure_c_shared_utility/strings.h"
11#include "azure_c_shared_utility/doublylinkedlist.h"
12#include "azure_c_shared_utility/crt_abstractions.h"
13#include "azure_c_shared_utility/agenttime.h"
14#include "azure_c_shared_utility/threadapi.h"
15#include "azure_c_shared_utility/sastoken.h"
16#include "azure_c_shared_utility/tickcounter.h"
17#include "azure_c_shared_utility/tlsio.h"
18#include "azure_c_shared_utility/platform.h"
19#include "azure_c_shared_utility/string_tokenizer.h"
20#include "azure_c_shared_utility/shared_util_options.h"
21#include "azure_c_shared_utility/urlencode.h"
22
23#include "internal/iothub_client_private.h"
24#include "internal/iothub_client_retry_control.h"
25#include "internal/iothub_transport_ll_private.h"
26#include "internal/iothubtransport_mqtt_common.h"
27#include "internal/iothubtransport.h"
28
29#include "azure_umqtt_c/mqtt_client.h"
30
31#include "iothub_client_options.h"
32#include "iothub_client_version.h"
33
34#include <stdarg.h>
35#include <stdio.h>
36
37#include <limits.h>
38#include <inttypes.h>
39
40#define SAS_REFRESH_MULTIPLIER .8
41#define EPOCH_TIME_T_VALUE 0
42#define DEFAULT_MQTT_KEEPALIVE 4*60 // 4 min
43#define DEFAULT_CONNACK_TIMEOUT 30 // 30 seconds
44#define BUILD_CONFIG_USERNAME 24
45#define SAS_TOKEN_DEFAULT_LEN 10
46#define RESEND_TIMEOUT_VALUE_MIN 1*60
47#define MAX_SEND_RECOUNT_LIMIT 2
48#define DEFAULT_CONNECTION_INTERVAL 30
49#define FAILED_CONN_BACKOFF_VALUE 5
50#define STATUS_CODE_FAILURE_VALUE 500
51#define STATUS_CODE_TIMEOUT_VALUE 408
52
53#define DEFAULT_RETRY_POLICY IOTHUB_CLIENT_RETRY_EXPONENTIAL_BACKOFF_WITH_JITTER
54#define DEFAULT_RETRY_TIMEOUT_IN_SECONDS 0
55#define MAX_DISCONNECT_VALUE 50
56
57#define ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS 60
58
59static const char TOPIC_DEVICE_TWIN_PREFIX[] = "$iothub/twin";
60static const char TOPIC_DEVICE_METHOD_PREFIX[] = "$iothub/methods";
61
62static const char* TOPIC_GET_DESIRED_STATE = "$iothub/twin/res/#";
63static const char* TOPIC_NOTIFICATION_STATE = "$iothub/twin/PATCH/properties/desired/#";
64
65static const char* TOPIC_DEVICE_MSG = "devices/%s/messages/devicebound/#";
66static const char* TOPIC_DEVICE_MODULE_MSG = "devices/%s/modules/%s/messages/devicebound/#";
67static const char* TOPIC_DEVICE_DEVICE = "devices/%s/messages/events/";
68static const char* TOPIC_DEVICE_DEVICE_MODULE = "devices/%s/modules/%s/messages/events/";
69
70static const char* TOPIC_INPUT_QUEUE_NAME = "devices/%s/modules/%s/#";
71
72static const char* TOPIC_DEVICE_METHOD_SUBSCRIBE = "$iothub/methods/POST/#";
73
74static const char* IOTHUB_API_VERSION = "2017-11-08-preview";
75
76static const char* PROPERTY_SEPARATOR = "&";
77static const char* REPORTED_PROPERTIES_TOPIC = "$iothub/twin/PATCH/properties/reported/?$rid=%"PRIu16;
78static const char* GET_PROPERTIES_TOPIC = "$iothub/twin/GET/?$rid=%"PRIu16;
79static const char* DEVICE_METHOD_RESPONSE_TOPIC = "$iothub/methods/res/%d/?$rid=%s";
80
81static const char* REQUEST_ID_PROPERTY = "?$rid=";
82
83static const char* MESSAGE_ID_PROPERTY = "mid";
84static const char* CORRELATION_ID_PROPERTY = "cid";
85static const char* CONTENT_TYPE_PROPERTY = "ct";
86static const char* CONTENT_ENCODING_PROPERTY = "ce";
87static const char* DIAGNOSTIC_ID_PROPERTY = "diagid";
88static const char* DIAGNOSTIC_CONTEXT_PROPERTY = "diagctx";
89static const char* CONNECTION_DEVICE_ID = "cdid";
90static const char* CONNECTION_MODULE_ID_PROPERTY = "cmid";
91
92static const char* DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY = "creationtimeutc";
93
94#define TOLOWER(c) (((c>='A') && (c<='Z'))?c-'A'+'a':c)
95
96#define UNSUBSCRIBE_FROM_TOPIC 0x0000
97#define SUBSCRIBE_GET_REPORTED_STATE_TOPIC 0x0001
98#define SUBSCRIBE_NOTIFICATION_STATE_TOPIC 0x0002
99#define SUBSCRIBE_TELEMETRY_TOPIC 0x0004
100#define SUBSCRIBE_DEVICE_METHOD_TOPIC 0x0008
101#define SUBSCRIBE_INPUT_QUEUE_TOPIC 0x0010
102#define SUBSCRIBE_TOPIC_COUNT 5
103
104MU_DEFINE_ENUM_STRINGS(MQTT_CLIENT_EVENT_ERROR, MQTT_CLIENT_EVENT_ERROR_VALUES)
105
106typedef struct SYSTEM_PROPERTY_INFO_TAG
107{
108 const char* propName;
109 size_t propLength;
110} SYSTEM_PROPERTY_INFO;
111
112static SYSTEM_PROPERTY_INFO sysPropList[] = {
113 { "%24.exp", 7 },
114 { "%24.mid", 7 },
115 { "%24.uid", 7 },
116 { "%24.to", 6 },
117 { "%24.cid", 7 },
118 { "%24.ct", 6 },
119 { "%24.ce", 6 },
120 { "devices/", 8 },
121 { "iothub-operation", 16 },
122 { "iothub-ack", 10 },
123 { "%24.on", 6 },
124 { "%24.cdid", 8 },
125 { "%24.cmid", 8 }
126};
127
128static const int slashes_to_reach_input_name = 5;
129
130typedef enum DEVICE_TWIN_MSG_TYPE_TAG
131{
132 REPORTED_STATE,
133 RETRIEVE_PROPERTIES
134} DEVICE_TWIN_MSG_TYPE;
135
136typedef enum MQTT_TRANSPORT_CREDENTIAL_TYPE_TAG
137{
138 CREDENTIAL_NOT_BUILD,
139 X509,
140 SAS_TOKEN_FROM_USER,
141 DEVICE_KEY,
142} MQTT_TRANSPORT_CREDENTIAL_TYPE;
143
144typedef enum MQTT_CLIENT_STATUS_TAG
145{
146 MQTT_CLIENT_STATUS_NOT_CONNECTED,
147 MQTT_CLIENT_STATUS_CONNECTING,
148 MQTT_CLIENT_STATUS_CONNECTED,
149 MQTT_CLIENT_STATUS_PENDING_CLOSE,
150 MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT
151} MQTT_CLIENT_STATUS;
152
153typedef struct MQTTTRANSPORT_HANDLE_DATA_TAG
154{
155 // Topic control
156 STRING_HANDLE topic_MqttEvent;
157 STRING_HANDLE topic_MqttMessage;
158 STRING_HANDLE topic_GetState;
159 STRING_HANDLE topic_NotifyState;
160 STRING_HANDLE topic_InputQueue;
161
162 STRING_HANDLE topic_DeviceMethods;
163
164 uint32_t topics_ToSubscribe;
165
166 // Connection related constants
167 STRING_HANDLE hostAddress;
168 STRING_HANDLE device_id;
169 STRING_HANDLE module_id;
170 STRING_HANDLE devicesAndModulesPath;
171 int portNum;
172 bool conn_attempted;
173
174 MQTT_GET_IO_TRANSPORT get_io_transport;
175
176 // The current mqtt iothub implementation requires that the hub name and the domain suffix be passed as the first of a series of segments
177 // passed through the username portion of the connection frame.
178 // The second segment will contain the device id. The two segments are delemited by a "/".
179 // The first segment can be a maximum 256 characters.
180 // The second segment can be a maximum 128 characters.
181 // With the / delimeter you have 384 chars (Plus a terminator of 0).
182 STRING_HANDLE configPassedThroughUsername;
183
184 // Protocol
185 MQTT_CLIENT_HANDLE mqttClient;
186 XIO_HANDLE xioTransport;
187
188 // Session - connection
189 uint16_t packetId;
190 uint16_t twin_resp_packet_id;
191
192 // Connection state control
193 bool isRegistered;
194 MQTT_CLIENT_STATUS mqttClientStatus;
195 bool isDestroyCalled;
196 bool isRetryExpiredCallbackSet;
197 bool device_twin_get_sent;
198 bool twin_resp_sub_recv;
199 bool isRecoverableError;
200 uint16_t keepAliveValue;
201 uint16_t connect_timeout_in_sec;
202 tickcounter_ms_t mqtt_connect_time;
203 size_t connectFailCount;
204 tickcounter_ms_t connectTick;
205 bool log_trace;
206 bool raw_trace;
207 TICK_COUNTER_HANDLE msgTickCounter;
208 OPTIONHANDLER_HANDLE saved_tls_options; // Here are the options from the xio layer if any is saved.
209
210 // Internal lists for message tracking
211 PDLIST_ENTRY waitingToSend;
212 DLIST_ENTRY ack_waiting_queue;
213
214 DLIST_ENTRY pending_get_twin_queue;
215
216 // Message tracking
217 CONTROL_PACKET_TYPE currPacketState;
218
219 // Telemetry specific
220 DLIST_ENTRY telemetry_waitingForAck;
221 bool auto_url_encode_decode;
222
223 // Controls frequency of reconnection logic.
224 RETRY_CONTROL_HANDLE retry_control_handle;
225
226 // Auth module used to generating handle authorization
227 // with either SAS Token, x509 Certs, and Device SAS Token
228 IOTHUB_AUTHORIZATION_HANDLE authorization_module;
229
230 TRANSPORT_CALLBACKS_INFO transport_callbacks;
231 void* transport_ctx;
232
233 char* http_proxy_hostname;
234 int http_proxy_port;
235 char* http_proxy_username;
236 char* http_proxy_password;
237 bool isProductInfoSet;
238 int disconnect_recv_flag;
239} MQTTTRANSPORT_HANDLE_DATA, *PMQTTTRANSPORT_HANDLE_DATA;
240
241typedef struct MQTT_DEVICE_TWIN_ITEM_TAG
242{
243 tickcounter_ms_t msgEnqueueTime;
244 tickcounter_ms_t msgPublishTime;
245 size_t retryCount;
246 IOTHUB_IDENTITY_TYPE iothub_type;
247 uint16_t packet_id;
248 uint32_t iothub_msg_id;
249 IOTHUB_DEVICE_TWIN* device_twin_data;
250 DEVICE_TWIN_MSG_TYPE device_twin_msg_type;
251 DLIST_ENTRY entry;
252 IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK userCallback;
253 void* userContext;
254} MQTT_DEVICE_TWIN_ITEM;
255
256typedef struct MQTT_MESSAGE_DETAILS_LIST_TAG
257{
258 tickcounter_ms_t msgPublishTime;
259 size_t retryCount;
260 IOTHUB_MESSAGE_LIST* iotHubMessageEntry;
261 void* context;
262 uint16_t packet_id;
263 DLIST_ENTRY entry;
264} MQTT_MESSAGE_DETAILS_LIST, *PMQTT_MESSAGE_DETAILS_LIST;
265
266typedef struct DEVICE_METHOD_INFO_TAG
267{
268 STRING_HANDLE request_id;
269} DEVICE_METHOD_INFO;
270
271static void free_proxy_data(MQTTTRANSPORT_HANDLE_DATA* mqtt_transport_instance)
272{
273 if (mqtt_transport_instance->http_proxy_hostname != NULL)
274 {
275 free(mqtt_transport_instance->http_proxy_hostname);
276 mqtt_transport_instance->http_proxy_hostname = NULL;
277 }
278
279 if (mqtt_transport_instance->http_proxy_username != NULL)
280 {
281 free(mqtt_transport_instance->http_proxy_username);
282 mqtt_transport_instance->http_proxy_username = NULL;
283 }
284
285 if (mqtt_transport_instance->http_proxy_password != NULL)
286 {
287 free(mqtt_transport_instance->http_proxy_password);
288 mqtt_transport_instance->http_proxy_password = NULL;
289 }
290}
291
292static void set_saved_tls_options(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options)
293{
294 if (transport->saved_tls_options != NULL)
295 {
296 OptionHandler_Destroy(transport->saved_tls_options);
297 }
298 transport->saved_tls_options = new_options;
299}
300
301static void free_transport_handle_data(MQTTTRANSPORT_HANDLE_DATA* transport_data)
302{
303 if (transport_data->mqttClient != NULL)
304 {
305 mqtt_client_deinit(transport_data->mqttClient);
306 }
307
308 if (transport_data->retry_control_handle != NULL)
309 {
310 retry_control_destroy(transport_data->retry_control_handle);
311 }
312
313 set_saved_tls_options(transport_data, NULL);
314
315 tickcounter_destroy(transport_data->msgTickCounter);
316
317 free_proxy_data(transport_data);
318
319 STRING_delete(transport_data->devicesAndModulesPath);
320 STRING_delete(transport_data->topic_MqttEvent);
321 STRING_delete(transport_data->topic_MqttMessage);
322 STRING_delete(transport_data->device_id);
323 STRING_delete(transport_data->module_id);
324 STRING_delete(transport_data->hostAddress);
325 STRING_delete(transport_data->configPassedThroughUsername);
326 STRING_delete(transport_data->topic_GetState);
327 STRING_delete(transport_data->topic_NotifyState);
328 STRING_delete(transport_data->topic_DeviceMethods);
329 STRING_delete(transport_data->topic_InputQueue);
330
331 free(transport_data);
332}
333
334int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds)
335{
336 int result;
337
338 if (handle == NULL)
339 {
340 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */
341 LogError("Invalid handle parameter. NULL.");
342 result = MU_FAILURE;
343 }
344 else
345 {
346 RETRY_CONTROL_HANDLE new_retry_control_handle;
347
348 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters]
349 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ]
350 if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL)
351 {
352 LogError("Failed creating new retry control handle");
353 result = MU_FAILURE;
354 }
355 else
356 {
357 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
358 RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle;
359
360 transport_data->retry_control_handle = new_retry_control_handle;
361 retry_control_destroy(previous_retry_control_handle);
362
363 /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/
364 result = 0;
365 }
366 }
367
368 return result;
369}
370
371static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transport_data)
372{
373 if (transport_data->packetId + 1 >= USHRT_MAX)
374 {
375 transport_data->packetId = 1;
376 }
377 else
378 {
379 transport_data->packetId++;
380 }
381 return transport_data->packetId;
382}
383
384#ifndef NO_LOGGING
385static const char* retrieve_mqtt_return_codes(CONNECT_RETURN_CODE rtn_code)
386{
387 switch (rtn_code)
388 {
389 case CONNECTION_ACCEPTED:
390 return "Accepted";
391 case CONN_REFUSED_UNACCEPTABLE_VERSION:
392 return "Unacceptable Version";
393 case CONN_REFUSED_ID_REJECTED:
394 return "Id Rejected";
395 case CONN_REFUSED_SERVER_UNAVAIL:
396 return "Server Unavailable";
397 case CONN_REFUSED_BAD_USERNAME_PASSWORD:
398 return "Bad Username/Password";
399 case CONN_REFUSED_NOT_AUTHORIZED:
400 return "Not Authorized";
401 case CONN_REFUSED_UNKNOWN:
402 default:
403 return "Unknown";
404 }
405}
406#endif // NO_LOGGING
407
408static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
409{
410 int result;
411 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
412 if (token_handle == NULL)
413 {
414 LogError("Failed creating token from device twin topic.");
415 result = MU_FAILURE;
416 }
417 else
418 {
419 STRING_HANDLE token_value;
420 if ((token_value = STRING_new()) == NULL)
421 {
422 LogError("Failed allocating new string .");
423 result = MU_FAILURE;
424 }
425 else
426 {
427 size_t token_index = 0;
428 size_t request_id_length = strlen(REQUEST_ID_PROPERTY);
429 result = MU_FAILURE;
430 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
431 {
432 if (token_index == 3)
433 {
434 if (STRING_concat_with_STRING(method_name, token_value) != 0)
435 {
436 LogError("Failed STRING_concat_with_STRING.");
437 result = MU_FAILURE;
438 break;
439 }
440 }
441 else if (token_index == 4)
442 {
443 if (STRING_length(token_value) >= request_id_length)
444 {
445 const char* request_id_value = STRING_c_str(token_value);
446 if (memcmp(request_id_value, REQUEST_ID_PROPERTY, request_id_length) == 0)
447 {
448 if (STRING_concat(request_id, request_id_value + request_id_length) != 0)
449 {
450 LogError("Failed STRING_concat failed.");
451 result = MU_FAILURE;
452 }
453 else
454 {
455 result = 0;
456 }
457 break;
458 }
459 }
460 }
461 token_index++;
462 }
463 STRING_delete(token_value);
464 }
465 STRING_TOKENIZER_destroy(token_handle);
466 }
467 return result;
468}
469
470static int parse_device_twin_topic_info(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code)
471{
472 int result;
473 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(resp_topic);
474 if (token_handle == NULL)
475 {
476 LogError("Failed creating token from device twin topic.");
477 result = MU_FAILURE;
478 *status_code = 0;
479 *request_id = 0;
480 *patch_msg = false;
481 }
482 else
483 {
484 STRING_HANDLE token_value;
485 if ((token_value = STRING_new()) == NULL)
486 {
487 LogError("Failed allocating new string .");
488 result = MU_FAILURE;
489 *status_code = 0;
490 *request_id = 0;
491 *patch_msg = false;
492 }
493 else
494 {
495 result = MU_FAILURE;
496 size_t token_count = 0;
497 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
498 {
499 if (token_count == 2)
500 {
501 if (strcmp(STRING_c_str(token_value), "PATCH") == 0)
502 {
503 *patch_msg = true;
504 *status_code = 0;
505 *request_id = 0;
506 result = 0;
507 break;
508 }
509 else
510 {
511 *patch_msg = false;
512 }
513 }
514 else if (token_count == 3)
515 {
516 *status_code = (int)atol(STRING_c_str(token_value));
517 if (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/?$rid=") == 0)
518 {
519 *request_id = (size_t)atol(STRING_c_str(token_value));
520 }
521 *patch_msg = false;
522 result = 0;
523 break;
524 }
525 token_count++;
526 }
527 STRING_delete(token_value);
528 }
529 STRING_TOKENIZER_destroy(token_handle);
530 }
531 return result;
532}
533
534static int InternStrnicmp(const char* s1, const char* s2, size_t n)
535{
536 int result;
537
538 if (s1 == NULL) result = -1;
539 else if (s2 == NULL) result = 1;
540 else
541 {
542 result = 0;
543
544 while (n-- && result == 0)
545 {
546 if (*s1 == 0) result = -1;
547 else if (*s2 == 0) result = 1;
548 else
549 {
550
551 result = TOLOWER(*s1) - TOLOWER(*s2);
552 ++s1;
553 ++s2;
554 }
555 }
556 }
557 return result;
558}
559
560static IOTHUB_IDENTITY_TYPE retrieve_topic_type(const char* topic_resp, const char* input_queue)
561{
562 IOTHUB_IDENTITY_TYPE type;
563 if (InternStrnicmp(topic_resp, TOPIC_DEVICE_TWIN_PREFIX, sizeof(TOPIC_DEVICE_TWIN_PREFIX) - 1) == 0)
564 {
565 type = IOTHUB_TYPE_DEVICE_TWIN;
566 }
567 else if (InternStrnicmp(topic_resp, TOPIC_DEVICE_METHOD_PREFIX, sizeof(TOPIC_DEVICE_METHOD_PREFIX) - 1) == 0)
568 {
569 type = IOTHUB_TYPE_DEVICE_METHODS;
570 }
571 // input_queue contains additional "#" from subscribe, which we strip off on comparing incoming.
572 else if ((input_queue != NULL) && InternStrnicmp(topic_resp, input_queue, strlen(input_queue) - 1) == 0)
573 {
574 type = IOTHUB_TYPE_EVENT_QUEUE;
575 }
576 else
577 {
578 type = IOTHUB_TYPE_TELEMETRY;
579 }
580 return type;
581
582}
583
584static void sendMsgComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult)
585{
586 DLIST_ENTRY messageCompleted;
587 DList_InitializeListHead(&messageCompleted);
588 DList_InsertTailList(&messageCompleted, &(iothubMsgList->entry));
589 transport_data->transport_callbacks.send_complete_cb(&messageCompleted, confirmResult, transport_data->transport_ctx);
590}
591
592static int addUserPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
593{
594 int result = 0;
595 const char* const* propertyKeys;
596 const char* const* propertyValues;
597 size_t propertyCount;
598 size_t index = *index_ptr;
599 MAP_HANDLE properties_map = IoTHubMessage_Properties(iothub_message_handle);
600 if (properties_map != NULL)
601 {
602 if (Map_GetInternals(properties_map, &propertyKeys, &propertyValues, &propertyCount) != MAP_OK)
603 {
604 LogError("Failed to get the internals of the property map.");
605 result = MU_FAILURE;
606 }
607 else
608 {
609 if (propertyCount != 0)
610 {
611 for (index = 0; index < propertyCount && result == 0; index++)
612 {
613 if (urlencode)
614 {
615 STRING_HANDLE property_key = URL_EncodeString(propertyKeys[index]);
616 STRING_HANDLE property_value = URL_EncodeString(propertyValues[index]);
617 if ((property_key == NULL) || (property_value == NULL))
618 {
619 LogError("Failed URL Encoding properties");
620 result = MU_FAILURE;
621 }
622 else if (STRING_sprintf(topic_string, "%s=%s%s", STRING_c_str(property_key), STRING_c_str(property_value), propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0)
623 {
624 LogError("Failed constructing property string.");
625 result = MU_FAILURE;
626 }
627 STRING_delete(property_key);
628 STRING_delete(property_value);
629 }
630 else
631 {
632 if (STRING_sprintf(topic_string, "%s=%s%s", propertyKeys[index], propertyValues[index], propertyCount - 1 == index ? "" : PROPERTY_SEPARATOR) != 0)
633 {
634 LogError("Failed constructing property string.");
635 result = MU_FAILURE;
636 }
637 }
638 }
639 }
640 }
641 }
642 *index_ptr = index;
643 return result;
644}
645
646static int addSystemPropertyToTopicString(STRING_HANDLE topic_string, size_t index, const char* property_key, const char* property_value, bool urlencode)
647{
648 int result = 0;
649
650 if (urlencode)
651 {
652 STRING_HANDLE encoded_property_value = URL_EncodeString(property_value);
653 if (encoded_property_value == NULL)
654 {
655 LogError("Failed URL encoding %s.", property_key);
656 result = MU_FAILURE;
657 }
658 else if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, STRING_c_str(encoded_property_value)) != 0)
659 {
660 LogError("Failed setting %s.", property_key);
661 result = MU_FAILURE;
662 }
663 STRING_delete(encoded_property_value);
664 }
665 else
666 {
667 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, property_key, property_value) != 0)
668 {
669 LogError("Failed setting %s.", property_key);
670 result = MU_FAILURE;
671 }
672 }
673 return result;
674}
675
676static int addSystemPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
677{
678 (void)urlencode;
679 int result = 0;
680 size_t index = *index_ptr;
681
682 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_052: [ IoTHubTransport_MQTT_Common_DoWork shall check for the CorrelationId property and if found add the value as a system property in the format of $.cid=<id> ] */
683 const char* correlation_id = IoTHubMessage_GetCorrelationId(iothub_message_handle);
684 if (correlation_id != NULL)
685 {
686 result = addSystemPropertyToTopicString(topic_string, index, CORRELATION_ID_PROPERTY, correlation_id, urlencode);
687 index++;
688 }
689 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_053: [ IoTHubTransport_MQTT_Common_DoWork shall check for the MessageId property and if found add the value as a system property in the format of $.mid=<id> ] */
690 if (result == 0)
691 {
692 const char* msg_id = IoTHubMessage_GetMessageId(iothub_message_handle);
693 if (msg_id != NULL)
694 {
695 result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_ID_PROPERTY, msg_id, urlencode);
696 index++;
697 }
698 }
699 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_010: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentType property and if found add the `value` as a system property in the format of `$.ct=<value>` ]
700 if (result == 0)
701 {
702 const char* content_type = IoTHubMessage_GetContentTypeSystemProperty(iothub_message_handle);
703 if (content_type != NULL)
704 {
705 result = addSystemPropertyToTopicString(topic_string, index, CONTENT_TYPE_PROPERTY, content_type, urlencode);
706 index++;
707 }
708 }
709 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_011: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the ContentEncoding property and if found add the `value` as a system property in the format of `$.ce=<value>` ]
710 if (result == 0)
711 {
712 const char* content_encoding = IoTHubMessage_GetContentEncodingSystemProperty(iothub_message_handle);
713 if (content_encoding != NULL)
714 {
715 result = addSystemPropertyToTopicString(topic_string, index, CONTENT_ENCODING_PROPERTY, content_encoding, urlencode);
716 index++;
717 }
718 }
719 *index_ptr = index;
720 return result;
721}
722
723static int addDiagnosticPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr)
724{
725 int result = 0;
726 size_t index = *index_ptr;
727
728 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_014: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the diagnostic properties including diagid and diagCreationTimeUtc and if found both add them as system property in the format of `$.diagid` and `$.diagctx` respectively]
729 const IOTHUB_MESSAGE_DIAGNOSTIC_PROPERTY_DATA* diagnosticData = IoTHubMessage_GetDiagnosticPropertyData(iothub_message_handle);
730 if (diagnosticData != NULL)
731 {
732 const char* diag_id = diagnosticData->diagnosticId;
733 const char* creation_time_utc = diagnosticData->diagnosticCreationTimeUtc;
734 //diagid and creationtimeutc must be present/unpresent simultaneously
735 if (diag_id != NULL && creation_time_utc != NULL)
736 {
737 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_ID_PROPERTY, diag_id) != 0)
738 {
739 LogError("Failed setting diagnostic id");
740 result = MU_FAILURE;
741 }
742 index++;
743
744 if (result == 0)
745 {
746 //construct diagnostic context, it should be urlencode(key1=value1,key2=value2)
747 STRING_HANDLE diagContextHandle = STRING_construct_sprintf("%s=%s", DIAGNOSTIC_CONTEXT_CREATION_TIME_UTC_PROPERTY, creation_time_utc);
748 if (diagContextHandle == NULL)
749 {
750 LogError("Failed constructing diagnostic context");
751 result = MU_FAILURE;
752 }
753 else
754 {
755 //Add other diagnostic context properties here if have more
756 STRING_HANDLE encodedContextValueHandle = URL_Encode(diagContextHandle);
757 const char* encodedContextValueString = NULL;
758 if (encodedContextValueHandle != NULL &&
759 (encodedContextValueString = STRING_c_str(encodedContextValueHandle)) != NULL)
760 {
761 if (STRING_sprintf(topic_string, "%s%%24.%s=%s", index == 0 ? "" : PROPERTY_SEPARATOR, DIAGNOSTIC_CONTEXT_PROPERTY, encodedContextValueString) != 0)
762 {
763 LogError("Failed setting diagnostic context");
764 result = MU_FAILURE;
765 }
766 STRING_delete(encodedContextValueHandle);
767 encodedContextValueHandle = NULL;
768 }
769 else
770 {
771 LogError("Failed encoding diagnostic context value");
772 result = MU_FAILURE;
773 }
774 STRING_delete(diagContextHandle);
775 diagContextHandle = NULL;
776 index++;
777 }
778 }
779 }
780 else if (diag_id != NULL || creation_time_utc != NULL)
781 {
782 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_015: [ `IoTHubTransport_MQTT_Common_DoWork` shall check whether diagid and diagCreationTimeUtc be present simultaneously, treat as error if not]
783 LogError("diagid and diagcreationtimeutc must be present simultaneously.");
784 result = MU_FAILURE;
785 }
786 }
787 return result;
788}
789
790
791static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic, bool urlencode)
792{
793 size_t index = 0;
794 STRING_HANDLE result = STRING_construct(eventTopic);
795 if (result == NULL)
796 {
797 LogError("Failed to create event topic string handle");
798 }
799 else if (addUserPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0)
800 {
801 LogError("Failed adding Properties to uMQTT Message");
802 STRING_delete(result);
803 result = NULL;
804 }
805 else if (addSystemPropertiesTouMqttMessage(iothub_message_handle, result, &index, urlencode) != 0)
806 {
807 LogError("Failed adding System Properties to uMQTT Message");
808 STRING_delete(result);
809 result = NULL;
810 }
811 else if (addDiagnosticPropertiesTouMqttMessage(iothub_message_handle, result, &index) != 0)
812 {
813 LogError("Failed adding Diagnostic Properties to uMQTT Message");
814 STRING_delete(result);
815 result = NULL;
816 }
817
818 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_060: [ `IoTHubTransport_MQTT_Common_DoWork` shall check for the OutputName property and if found add the value as a system property in the format of $.on=<value> ]
819 if (result != NULL)
820 {
821 const char* output_name = IoTHubMessage_GetOutputName(iothub_message_handle);
822 if (output_name != NULL)
823 {
824 if (STRING_sprintf(result, "%s%%24.on=%s/", index == 0 ? "" : PROPERTY_SEPARATOR, output_name) != 0)
825 {
826 LogError("Failed setting output name.");
827 STRING_delete(result);
828 result = NULL;
829 }
830 index++;
831 }
832 }
833
834 return result;
835}
836
837static int publish_mqtt_telemetry_msg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
838{
839 int result;
840 STRING_HANDLE msgTopic = addPropertiesTouMqttMessage(mqttMsgEntry->iotHubMessageEntry->messageHandle, STRING_c_str(transport_data->topic_MqttEvent), transport_data->auto_url_encode_decode);
841 if (msgTopic == NULL)
842 {
843 LogError("Failed adding properties to mqtt message");
844 result = MU_FAILURE;
845 }
846 else
847 {
848 MQTT_MESSAGE_HANDLE mqttMsg = mqttmessage_create_in_place(mqttMsgEntry->packet_id, STRING_c_str(msgTopic), DELIVER_AT_LEAST_ONCE, payload, len);
849 if (mqttMsg == NULL)
850 {
851 LogError("Failed creating mqtt message");
852 result = MU_FAILURE;
853 }
854 else
855 {
856 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqttMsgEntry->msgPublishTime) != 0)
857 {
858 LogError("Failed retrieving tickcounter info");
859 result = MU_FAILURE;
860 }
861 else
862 {
863 if (mqtt_client_publish(transport_data->mqttClient, mqttMsg) != 0)
864 {
865 LogError("Failed attempting to publish mqtt message");
866 result = MU_FAILURE;
867 }
868 else
869 {
870 mqttMsgEntry->retryCount++;
871 result = 0;
872 }
873 }
874 mqttmessage_destroy(mqttMsg);
875 }
876 STRING_delete(msgTopic);
877 }
878 return result;
879}
880
881static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size)
882{
883 int result;
884 uint16_t packet_id = get_next_packet_id(transport_data);
885
886 STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id));
887 if (msg_topic == NULL)
888 {
889 LogError("Failed constructing message topic.");
890 result = MU_FAILURE;
891 }
892 else
893 {
894 MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create_in_place(packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, response, response_size);
895 if (mqtt_get_msg == NULL)
896 {
897 LogError("Failed constructing mqtt message.");
898 result = MU_FAILURE;
899 }
900 else
901 {
902 if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
903 {
904 LogError("Failed publishing to mqtt client.");
905 result = MU_FAILURE;
906 }
907 else
908 {
909 result = 0;
910 }
911 mqttmessage_destroy(mqtt_get_msg);
912 }
913 STRING_delete(msg_topic);
914 }
915 return result;
916}
917
918
919static void destroy_device_twin_get_message(MQTT_DEVICE_TWIN_ITEM* msg_entry)
920{
921 free(msg_entry);
922}
923
924static MQTT_DEVICE_TWIN_ITEM* create_device_twin_get_message(MQTTTRANSPORT_HANDLE_DATA* transport_data)
925{
926 MQTT_DEVICE_TWIN_ITEM* result;
927
928 if ((result = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM))) == NULL)
929 {
930 LogError("Failed allocating device twin data.");
931 }
932 else
933 {
934 result->packet_id = get_next_packet_id(transport_data);
935 result->iothub_msg_id = 0;
936 result->device_twin_msg_type = RETRIEVE_PROPERTIES;
937 result->retryCount = 0;
938 result->msgPublishTime = 0;
939 result->msgEnqueueTime = 0;
940 result->iothub_type = IOTHUB_TYPE_DEVICE_TWIN;
941 result->device_twin_data = NULL;
942 result->userCallback = NULL;
943 result->userContext = NULL;
944 }
945
946 return result;
947}
948
949static int publish_device_twin_get_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
950{
951 int result;
952
953 STRING_HANDLE msg_topic = STRING_construct_sprintf(GET_PROPERTIES_TOPIC, mqtt_info->packet_id);
954 if (msg_topic == NULL)
955 {
956 LogError("Failed constructing get Prop topic.");
957 result = MU_FAILURE;
958 }
959 else
960 {
961 MQTT_MESSAGE_HANDLE mqtt_get_msg = mqttmessage_create(mqtt_info->packet_id, STRING_c_str(msg_topic), DELIVER_AT_MOST_ONCE, NULL, 0);
962 if (mqtt_get_msg == NULL)
963 {
964 LogError("Failed constructing mqtt message.");
965 result = MU_FAILURE;
966 }
967 else
968 {
969 if (mqtt_client_publish(transport_data->mqttClient, mqtt_get_msg) != 0)
970 {
971 LogError("Failed publishing to mqtt client.");
972 result = MU_FAILURE;
973 }
974 else
975 {
976 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
977 result = 0;
978 }
979 mqttmessage_destroy(mqtt_get_msg);
980 }
981 STRING_delete(msg_topic);
982 }
983
984 return result;
985}
986
987static void sendPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transportData)
988{
989 PDLIST_ENTRY dev_twin_item = transportData->pending_get_twin_queue.Flink;
990
991 while (dev_twin_item != &transportData->pending_get_twin_queue)
992 {
993 DLIST_ENTRY saveListEntry;
994 saveListEntry.Flink = dev_twin_item->Flink;
995 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
996 (void)DList_RemoveEntryList(dev_twin_item);
997
998 if (publish_device_twin_get_message(transportData, msg_entry) != 0)
999 {
1000 LogError("Failed sending pending get twin request");
1001 destroy_device_twin_get_message(msg_entry);
1002 }
1003 else
1004 {
1005 transportData->device_twin_get_sent = true;
1006 }
1007
1008 dev_twin_item = saveListEntry.Flink;
1009 }
1010}
1011
1012static void removeExpiredPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1013{
1014 tickcounter_ms_t current_ms;
1015
1016 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms) == 0)
1017 {
1018 PDLIST_ENTRY listItem = transport_data->pending_get_twin_queue.Flink;
1019
1020 while (listItem != &transport_data->pending_get_twin_queue)
1021 {
1022 DLIST_ENTRY nextListItem;
1023 nextListItem.Flink = listItem->Flink;
1024 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(listItem, MQTT_DEVICE_TWIN_ITEM, entry);
1025
1026 if (((current_ms - msg_entry->msgEnqueueTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS)
1027 {
1028 (void)DList_RemoveEntryList(listItem);
1029 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, msg_entry->userContext);
1030 destroy_device_twin_get_message(msg_entry);
1031 }
1032
1033 listItem = nextListItem.Flink;
1034 }
1035 }
1036}
1037
1038static void removeExpiredGetTwinRequestsPendingAck(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1039{
1040 tickcounter_ms_t current_ms;
1041
1042 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms) == 0)
1043 {
1044 PDLIST_ENTRY listItem = transport_data->ack_waiting_queue.Flink;
1045
1046 while (listItem != &transport_data->ack_waiting_queue)
1047 {
1048 DLIST_ENTRY nextListItem;
1049 nextListItem.Flink = listItem->Flink;
1050 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(listItem, MQTT_DEVICE_TWIN_ITEM, entry);
1051
1052 // Check if it is a on-demand get-twin request.
1053 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES && msg_entry->userCallback != NULL)
1054 {
1055 if (((current_ms - msg_entry->msgEnqueueTime) / 1000) >= ON_DEMAND_GET_TWIN_REQUEST_TIMEOUT_SECS)
1056 {
1057 (void)DList_RemoveEntryList(listItem);
1058 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, msg_entry->userContext);
1059 destroy_device_twin_get_message(msg_entry);
1060 }
1061 }
1062
1063 listItem = nextListItem.Flink;
1064 }
1065 }
1066}
1067
1068static int publish_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
1069{
1070 int result;
1071 mqtt_info->packet_id = get_next_packet_id(transport_data);
1072 mqtt_info->device_twin_msg_type = REPORTED_STATE;
1073
1074 STRING_HANDLE msgTopic = STRING_construct_sprintf(REPORTED_PROPERTIES_TOPIC, mqtt_info->packet_id);
1075 if (msgTopic == NULL)
1076 {
1077 LogError("Failed constructing reported prop topic.");
1078 result = MU_FAILURE;
1079 }
1080 else
1081 {
1082 const CONSTBUFFER* data_buff;
1083 if ((data_buff = CONSTBUFFER_GetContent(device_twin_info->report_data_handle)) == NULL)
1084 {
1085 LogError("Failed retrieving buffer content");
1086 result = MU_FAILURE;
1087 }
1088 else
1089 {
1090 MQTT_MESSAGE_HANDLE mqtt_rpt_msg = mqttmessage_create_in_place(mqtt_info->packet_id, STRING_c_str(msgTopic), DELIVER_AT_MOST_ONCE, data_buff->buffer, data_buff->size);
1091 if (mqtt_rpt_msg == NULL)
1092 {
1093 LogError("Failed creating mqtt message");
1094 result = MU_FAILURE;
1095 }
1096 else
1097 {
1098 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgPublishTime) != 0)
1099 {
1100 LogError("Failed retrieving tickcounter info");
1101 result = MU_FAILURE;
1102 }
1103 else
1104 {
1105 if (mqtt_client_publish(transport_data->mqttClient, mqtt_rpt_msg) != 0)
1106 {
1107 LogError("Failed publishing mqtt message");
1108 result = MU_FAILURE;
1109 }
1110 else
1111 {
1112 mqtt_info->retryCount++;
1113 result = 0;
1114 }
1115 }
1116 mqttmessage_destroy(mqtt_rpt_msg);
1117 }
1118 }
1119 STRING_delete(msgTopic);
1120 }
1121 return result;
1122}
1123
1124static void changeStateToSubscribeIfAllowed(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1125{
1126 if (transport_data->currPacketState != CONNACK_TYPE &&
1127 transport_data->currPacketState != CONNECT_TYPE &&
1128 transport_data->currPacketState != DISCONNECT_TYPE &&
1129 transport_data->currPacketState != PACKET_TYPE_ERROR)
1130 {
1131 transport_data->currPacketState = SUBSCRIBE_TYPE;
1132 }
1133}
1134
1135static int subscribeToNotifyStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1136{
1137 int result;
1138
1139 if (transport_data->topic_NotifyState == NULL)
1140 {
1141 transport_data->topic_NotifyState = STRING_construct(TOPIC_NOTIFICATION_STATE);
1142 if (transport_data->topic_NotifyState == NULL)
1143 {
1144 LogError("Failure: unable constructing notify state topic");
1145 result = MU_FAILURE;
1146 }
1147 else
1148 {
1149 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1150 result = 0;
1151 }
1152 }
1153 else
1154 {
1155 result = 0;
1156 }
1157
1158 if (result == 0)
1159 {
1160 changeStateToSubscribeIfAllowed(transport_data);
1161 }
1162
1163 return result;
1164}
1165
1166
1167static bool isSystemProperty(const char* tokenData)
1168{
1169 bool result = false;
1170 size_t propCount = sizeof(sysPropList) / sizeof(sysPropList[0]);
1171 size_t index = 0;
1172 for (index = 0; index < propCount; index++)
1173 {
1174 if (memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0)
1175 {
1176 result = true;
1177 break;
1178 }
1179 }
1180 return result;
1181}
1182
1183// Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ]
1184// Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ]
1185static int addInputNamePropertyToMessage(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name)
1186{
1187 int result = MU_FAILURE;
1188 int number_tokens_read = 0;
1189
1190 STRING_TOKENIZER_HANDLE token_handle = STRING_TOKENIZER_create_from_char(topic_name);
1191 if (token_handle == NULL)
1192 {
1193 LogError("STRING_TOKENIZER_create_from_char failed\n");
1194 result = MU_FAILURE;
1195 }
1196 else
1197 {
1198 STRING_HANDLE token_value;
1199 if ((token_value = STRING_new()) == NULL)
1200 {
1201 LogError("Failed allocating token_value");
1202 }
1203 else
1204 {
1205 while (STRING_TOKENIZER_get_next_token(token_handle, token_value, "/") == 0)
1206 {
1207 number_tokens_read++;
1208 if (number_tokens_read == (slashes_to_reach_input_name + 1))
1209 {
1210 if ((IOTHUB_MESSAGE_OK != IoTHubMessage_SetInputName(IoTHubMessage, STRING_c_str(token_value))))
1211 {
1212 LogError("Failed adding input name to msg");
1213 result = MU_FAILURE;
1214 }
1215 else
1216 {
1217 result = 0;
1218 }
1219 break;
1220 }
1221 }
1222 }
1223 STRING_delete(token_value);
1224
1225 if (number_tokens_read != (slashes_to_reach_input_name + 1))
1226 {
1227 LogError("Not enough '/' to contain input name. Got %d, need at least %d", number_tokens_read, (slashes_to_reach_input_name + 1));
1228 result = MU_FAILURE;
1229 }
1230 STRING_TOKENIZER_destroy(token_handle);
1231 }
1232
1233 return result;
1234}
1235
1236static int setMqttMessagePropertyIfPossible(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* propName, const char* propValue, size_t nameLen)
1237{
1238 // Not finding a system property to map to isn't an error.
1239 int result = 0;
1240
1241 if (nameLen > 4)
1242 {
1243 if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_DEVICE_ID) == 0)
1244 {
1245 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_063: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.cdid` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ConnectionDeviceId property ]
1246 if (IoTHubMessage_SetConnectionDeviceId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1247 {
1248 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
1249 result = MU_FAILURE;
1250 }
1251 return result;
1252 }
1253 if (strcmp((const char*)&propName[nameLen - 4], CONNECTION_MODULE_ID_PROPERTY) == 0)
1254 {
1255 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_064: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.cmid` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ConnectionModuleId property ]
1256 if (IoTHubMessage_SetConnectionModuleId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1257 {
1258 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
1259 result = MU_FAILURE;
1260 }
1261 return result;
1262 }
1263 }
1264 if (nameLen > 3)
1265 {
1266 if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_ID_PROPERTY) == 0)
1267 {
1268 if (IoTHubMessage_SetMessageId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1269 {
1270 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'messageId' property.");
1271 result = MU_FAILURE;
1272 }
1273 return result;
1274 }
1275 else if (strcmp((const char*)&propName[nameLen - 3], CORRELATION_ID_PROPERTY) == 0)
1276 {
1277 if (IoTHubMessage_SetCorrelationId(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1278 {
1279 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'correlationId' property.");
1280 result = MU_FAILURE;
1281 }
1282 return result;
1283 }
1284 }
1285
1286 if (nameLen > 2)
1287 {
1288 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_012: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.ct` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ContentType property ]
1289 if (strcmp((const char*)&propName[nameLen - 2], CONTENT_TYPE_PROPERTY) == 0)
1290 {
1291 if (IoTHubMessage_SetContentTypeSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1292 {
1293 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'customContentType' property.");
1294 result = MU_FAILURE;
1295 }
1296 return result;
1297 }
1298 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_013: [ If type is IOTHUB_TYPE_TELEMETRY and the system property `$.ce` is defined, its value shall be set on the IOTHUB_MESSAGE_HANDLE's ContentEncoding property ]
1299 else if (strcmp((const char*)&propName[nameLen - 2], CONTENT_ENCODING_PROPERTY) == 0)
1300 {
1301 if (IoTHubMessage_SetContentEncodingSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
1302 {
1303 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'contentEncoding' property.");
1304 result = MU_FAILURE;
1305 }
1306 return result;
1307 }
1308 }
1309
1310 return result;
1311}
1312
1313static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name, bool urldecode)
1314{
1315 int result;
1316 STRING_TOKENIZER_HANDLE token = STRING_TOKENIZER_create_from_char(topic_name);
1317 if (token != NULL)
1318 {
1319 MAP_HANDLE propertyMap = IoTHubMessage_Properties(IoTHubMessage);
1320 if (propertyMap == NULL)
1321 {
1322 LogError("Failure to retrieve IoTHubMessage_properties.");
1323 result = MU_FAILURE;
1324 }
1325 else
1326 {
1327 STRING_HANDLE output = STRING_new();
1328 if (output == NULL)
1329 {
1330 LogError("Failure to allocate STRING_new.");
1331 result = MU_FAILURE;
1332 }
1333 else
1334 {
1335 result = 0;
1336
1337 while (STRING_TOKENIZER_get_next_token(token, output, PROPERTY_SEPARATOR) == 0 && result == 0)
1338 {
1339 const char* tokenData = STRING_c_str(output);
1340 size_t tokenLen = strlen(tokenData);
1341 if (tokenData == NULL || tokenLen == 0)
1342 {
1343 break;
1344 }
1345 else
1346 {
1347 if (isSystemProperty(tokenData))
1348 {
1349 const char* iterator = tokenData;
1350 while (iterator != NULL && *iterator != '\0' && result == 0)
1351 {
1352 if (*iterator == '=')
1353 {
1354 char* propName = NULL;
1355 char* propValue = NULL;
1356 size_t nameLen = iterator - tokenData;
1357 size_t valLen = tokenLen - (nameLen + 1) + 1;
1358
1359 if ((propName = malloc(nameLen + 1)) == NULL || (propValue = malloc(valLen + 1)) == NULL)
1360 {
1361 LogError("Failed allocating property name (%p) and/or value (%p)", propName, propValue);
1362 free(propName);
1363 result = MU_FAILURE;
1364 }
1365 else
1366 {
1367 memcpy(propName, tokenData, nameLen);
1368 propName[nameLen] = '\0';
1369
1370 memcpy(propValue, iterator + 1, valLen);
1371 propValue[valLen] = '\0';
1372
1373 if (urldecode)
1374 {
1375 STRING_HANDLE propValue_decoded;
1376 if ((propValue_decoded = URL_DecodeString(propValue)) == NULL)
1377 {
1378 LogError("Failed to URL decode property value");
1379 result = MU_FAILURE;
1380 }
1381 else if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, STRING_c_str(propValue_decoded), nameLen) != 0)
1382 {
1383 LogError("Unable to set message property");
1384 result = MU_FAILURE;
1385 }
1386 STRING_delete(propValue_decoded);
1387 }
1388 else
1389 {
1390 if (setMqttMessagePropertyIfPossible(IoTHubMessage, propName, propValue, nameLen) != 0)
1391 {
1392 LogError("Unable to set message property");
1393 result = MU_FAILURE;
1394 }
1395 }
1396 free(propName);
1397 free(propValue);
1398 }
1399 break;
1400 }
1401 iterator++;
1402 }
1403 }
1404 else //User Properties
1405 {
1406 const char* iterator = tokenData;
1407 while (iterator != NULL && *iterator != '\0' && result == 0)
1408 {
1409 if (*iterator == '=')
1410 {
1411 char* propName = NULL;
1412 char* propValue = NULL;
1413 size_t nameLen = iterator - tokenData;
1414 size_t valLen = tokenLen - (nameLen + 1) + 1;
1415
1416 if ((propName = (char*)malloc(nameLen + 1)) == NULL || (propValue = (char*)malloc(valLen + 1)) == NULL)
1417 {
1418 free(propName);
1419 LogError("Failed allocating property information");
1420 result = MU_FAILURE;
1421 }
1422 else
1423 {
1424 memcpy(propName, tokenData, nameLen);
1425 propName[nameLen] = '\0';
1426
1427 memcpy(propValue, iterator + 1, valLen);
1428 propValue[valLen] = '\0';
1429
1430 if (urldecode)
1431 {
1432 STRING_HANDLE propName_decoded = URL_DecodeString(propName);
1433 STRING_HANDLE propValue_decoded = URL_DecodeString(propValue);
1434 if (propName_decoded == NULL || propValue_decoded == NULL)
1435 {
1436 LogError("Failed to URL decode property");
1437 result = MU_FAILURE;
1438 }
1439 else if (Map_AddOrUpdate(propertyMap, STRING_c_str(propName_decoded), STRING_c_str(propValue_decoded)) != MAP_OK)
1440 {
1441 LogError("Map_AddOrUpdate failed.");
1442 result = MU_FAILURE;
1443 }
1444 STRING_delete(propName_decoded);
1445 STRING_delete(propValue_decoded);
1446 }
1447 else
1448 {
1449 if (Map_AddOrUpdate(propertyMap, propName, propValue) != MAP_OK)
1450 {
1451 LogError("Map_AddOrUpdate failed.");
1452 result = MU_FAILURE;
1453 }
1454 }
1455 free(propName);
1456 free(propValue);
1457 }
1458 break;
1459 }
1460 iterator++;
1461 }
1462 }
1463 }
1464 }
1465 STRING_delete(output);
1466 }
1467 }
1468 STRING_TOKENIZER_destroy(token);
1469 }
1470 else
1471 {
1472 LogError("Unable to create Tokenizer object.");
1473 result = MU_FAILURE;
1474 }
1475 return result;
1476}
1477
1478static void mqtt_notification_callback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx)
1479{
1480 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqtt_notification_callback shall do nothing. ] */
1481 if (msgHandle != NULL && callbackCtx != NULL)
1482 {
1483 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqtt_notification_callback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */
1484 const char* topic_resp = mqttmessage_getTopicName(msgHandle);
1485 if (topic_resp == NULL)
1486 {
1487 LogError("Failure: NULL topic name encountered");
1488 }
1489 else
1490 {
1491 PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1492
1493 IOTHUB_IDENTITY_TYPE type = retrieve_topic_type(topic_resp, STRING_c_str(transportData->topic_InputQueue));
1494 if (type == IOTHUB_TYPE_DEVICE_TWIN)
1495 {
1496 size_t request_id;
1497 int status_code;
1498 bool notification_msg;
1499 if (parse_device_twin_topic_info(topic_resp, &notification_msg, &request_id, &status_code) != 0)
1500 {
1501 LogError("Failure: parsing device topic info");
1502 }
1503 else
1504 {
1505 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1506 if (notification_msg)
1507 {
1508 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx);
1509 }
1510 else
1511 {
1512 PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink;
1513 while (dev_twin_item != &transportData->ack_waiting_queue)
1514 {
1515 DLIST_ENTRY saveListEntry;
1516 saveListEntry.Flink = dev_twin_item->Flink;
1517 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
1518 if (request_id == msg_entry->packet_id)
1519 {
1520 (void)DList_RemoveEntryList(dev_twin_item);
1521 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES)
1522 {
1523 if (msg_entry->userCallback == NULL)
1524 {
1525 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */
1526 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx);
1527 // Only after receiving device twin request should we start listening for patches.
1528 (void)subscribeToNotifyStateIfNeeded(transportData);
1529 }
1530 else
1531 {
1532 // This is a on-demand get twin request.
1533 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext);
1534 }
1535 }
1536 else
1537 {
1538 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_ReportedStateComplete ] */
1539 transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx);
1540 // Only after receiving device twin request should we start listening for patches.
1541 (void)subscribeToNotifyStateIfNeeded(transportData);
1542 }
1543
1544 destroy_device_twin_get_message(msg_entry);
1545 break;
1546 }
1547 dev_twin_item = saveListEntry.Flink;
1548 }
1549 }
1550 }
1551 }
1552 else if (type == IOTHUB_TYPE_DEVICE_METHODS)
1553 {
1554 STRING_HANDLE method_name = STRING_new();
1555 if (method_name == NULL)
1556 {
1557 LogError("Failure: allocating method_name string value");
1558 }
1559 else
1560 {
1561 DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO));
1562 if (dev_method_info == NULL)
1563 {
1564 LogError("Failure: allocating DEVICE_METHOD_INFO object");
1565 }
1566 else
1567 {
1568 dev_method_info->request_id = STRING_new();
1569 if (dev_method_info->request_id == NULL)
1570 {
1571 LogError("Failure constructing request_id string");
1572 free(dev_method_info);
1573 }
1574 else if (retrieve_device_method_rid_info(topic_resp, method_name, dev_method_info->request_id) != 0)
1575 {
1576 LogError("Failure: retrieve device topic info");
1577 STRING_delete(dev_method_info->request_id);
1578 free(dev_method_info);
1579 }
1580 else
1581 {
1582 /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */
1583 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
1584 if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0)
1585 {
1586 LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete");
1587 STRING_delete(dev_method_info->request_id);
1588 free(dev_method_info);
1589 }
1590 }
1591 }
1592 STRING_delete(method_name);
1593 }
1594 }
1595 else
1596 {
1597 const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle);
1598 IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length);
1599 if (IoTHubMessage == NULL)
1600 {
1601 LogError("Failure: IotHub Message creation has failed.");
1602 }
1603 else
1604 {
1605 if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMessage(IoTHubMessage, topic_resp) != 0))
1606 {
1607 LogError("failure adding input name to property.");
1608 }
1609 // Will need to update this when the service has messages that can be rejected
1610 else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0)
1611 {
1612 LogError("failure extracting mqtt properties.");
1613 }
1614 else
1615 {
1616 MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO));
1617 if (messageData == NULL)
1618 {
1619 LogError("malloc failed");
1620 }
1621 else
1622 {
1623 messageData->messageHandle = IoTHubMessage;
1624 messageData->transportContext = NULL;
1625
1626 if (type == IOTHUB_TYPE_EVENT_QUEUE)
1627 {
1628 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqtt_notification_callback` shall call `IoTHubClient_LL_MessageCallback`. ]
1629 if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx))
1630 {
1631 LogError("IoTHubClientCore_LL_MessageCallbackreturned false");
1632
1633 IoTHubMessage_Destroy(IoTHubMessage);
1634 free(messageData);
1635 }
1636 }
1637 else
1638 {
1639 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_MessageCallback. ] */
1640 if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx))
1641 {
1642 LogError("IoTHubClientCore_LL_MessageCallback returned false");
1643 IoTHubMessage_Destroy(IoTHubMessage);
1644 free(messageData);
1645 }
1646 }
1647 }
1648 }
1649 }
1650 }
1651 }
1652 }
1653}
1654
1655static void mqtt_operation_complete_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx)
1656{
1657 (void)handle;
1658 if (callbackCtx != NULL)
1659 {
1660 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1661
1662 switch (actionResult)
1663 {
1664 case MQTT_CLIENT_ON_PUBLISH_ACK:
1665 case MQTT_CLIENT_ON_PUBLISH_COMP:
1666 {
1667 const PUBLISH_ACK* puback = (const PUBLISH_ACK*)msgInfo;
1668 if (puback != NULL)
1669 {
1670 PDLIST_ENTRY currentListEntry = transport_data->telemetry_waitingForAck.Flink;
1671 while (currentListEntry != &transport_data->telemetry_waitingForAck)
1672 {
1673 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentListEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
1674 DLIST_ENTRY saveListEntry;
1675 saveListEntry.Flink = currentListEntry->Flink;
1676
1677 if (puback->packetId == mqttMsgEntry->packet_id)
1678 {
1679 (void)DList_RemoveEntryList(currentListEntry); //First remove the item from Waiting for Ack List.
1680 sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK);
1681 free(mqttMsgEntry);
1682 }
1683 currentListEntry = saveListEntry.Flink;
1684 }
1685 }
1686 else
1687 {
1688 LogError("Failure: MQTT_CLIENT_ON_PUBLISH_ACK publish_ack structure NULL.");
1689 }
1690 break;
1691 }
1692 case MQTT_CLIENT_ON_CONNACK:
1693 {
1694 const CONNECT_ACK* connack = (const CONNECT_ACK*)msgInfo;
1695 if (connack != NULL)
1696 {
1697 if (connack->returnCode == CONNECTION_ACCEPTED)
1698 {
1699 // The connect packet has been acked
1700 transport_data->currPacketState = CONNACK_TYPE;
1701 transport_data->isRecoverableError = true;
1702 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTED;
1703
1704 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_008: [ Upon successful connection the retry control shall be reset using retry_control_reset() ]
1705 retry_control_reset(transport_data->retry_control_handle);
1706
1707 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK, transport_data->transport_ctx);
1708 }
1709 else
1710 {
1711 if (connack->returnCode == CONN_REFUSED_SERVER_UNAVAIL)
1712 {
1713 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_DEVICE_DISABLED, transport_data->transport_ctx);
1714 }
1715 else if (connack->returnCode == CONN_REFUSED_BAD_USERNAME_PASSWORD || connack->returnCode == CONN_REFUSED_ID_REJECTED)
1716 {
1717 transport_data->isRecoverableError = false;
1718 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
1719 }
1720 else if (connack->returnCode == CONN_REFUSED_NOT_AUTHORIZED)
1721 {
1722 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_DEVICE_DISABLED, transport_data->transport_ctx);
1723 }
1724 else if (connack->returnCode == CONN_REFUSED_UNACCEPTABLE_VERSION)
1725 {
1726 transport_data->isRecoverableError = false;
1727 }
1728 LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode));
1729 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_PENDING_CLOSE;
1730 transport_data->currPacketState = PACKET_TYPE_ERROR;
1731 }
1732 }
1733 else
1734 {
1735 LogError("MQTT_CLIENT_ON_CONNACK CONNACK parameter is NULL.");
1736 }
1737 break;
1738 }
1739 case MQTT_CLIENT_ON_SUBSCRIBE_ACK:
1740 {
1741 const SUBSCRIBE_ACK* suback = (const SUBSCRIBE_ACK*)msgInfo;
1742 if (suback != NULL)
1743 {
1744 size_t index = 0;
1745 for (index = 0; index < suback->qosCount; index++)
1746 {
1747 if (suback->qosReturn[index] == DELIVER_FAILURE)
1748 {
1749 LogError("Subscribe delivery failure of subscribe %lu", (unsigned long)index);
1750 }
1751 }
1752 // The subscribed packet has been acked
1753 transport_data->currPacketState = SUBACK_TYPE;
1754
1755 // Is this a twin message
1756 if (suback->packetId == transport_data->twin_resp_packet_id)
1757 {
1758 transport_data->twin_resp_sub_recv = true;
1759 }
1760 }
1761 else
1762 {
1763 LogError("Failure: MQTT_CLIENT_ON_SUBSCRIBE_ACK SUBSCRIBE_ACK parameter is NULL.");
1764 }
1765 break;
1766 }
1767 case MQTT_CLIENT_ON_PUBLISH_RECV:
1768 case MQTT_CLIENT_ON_PUBLISH_REL:
1769 {
1770 // Currently not used
1771 break;
1772 }
1773 case MQTT_CLIENT_ON_DISCONNECT:
1774 {
1775 // Close the client so we can reconnect again
1776 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
1777 break;
1778 }
1779 case MQTT_CLIENT_ON_UNSUBSCRIBE_ACK:
1780 case MQTT_CLIENT_ON_PING_RESPONSE:
1781 default:
1782 {
1783 break;
1784 }
1785 }
1786 }
1787}
1788
1789// Prior to creating a new connection, if we have an existing xioTransport that has been connected before
1790// we need to clear it now or else cached settings (especially TLS when communicating with HTTP proxies)
1791// will break reconnection attempt.
1792static void ResetConnectionIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1793{
1794 if (transport_data->xioTransport != NULL && transport_data->conn_attempted)
1795 {
1796 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
1797 set_saved_tls_options(transport_data, options);
1798
1799 xio_destroy(transport_data->xioTransport);
1800 transport_data->xioTransport = NULL;
1801 }
1802}
1803
1804static void mqtt_disconnect_cb(void* ctx)
1805{
1806 if (ctx != NULL)
1807 {
1808 int* disconn_recv = (int*)ctx;
1809 *disconn_recv = 1;
1810 }
1811}
1812
1813static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1814{
1815 if (transport_data->currPacketState != DISCONNECT_TYPE)
1816 {
1817 if (!transport_data->isDestroyCalled)
1818 {
1819 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
1820 set_saved_tls_options(transport_data, options);
1821 }
1822 // Ensure the disconnect message is sent
1823 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
1824 {
1825 transport_data->disconnect_recv_flag = 0;
1826 (void)mqtt_client_disconnect(transport_data->mqttClient, mqtt_disconnect_cb, &transport_data->disconnect_recv_flag);
1827 size_t disconnect_ctr = 0;
1828 do
1829 {
1830 mqtt_client_dowork(transport_data->mqttClient);
1831 disconnect_ctr++;
1832 ThreadAPI_Sleep(50);
1833 } while ((disconnect_ctr < MAX_DISCONNECT_VALUE) && (transport_data->disconnect_recv_flag == 0));
1834 }
1835 xio_destroy(transport_data->xioTransport);
1836 transport_data->xioTransport = NULL;
1837
1838 transport_data->device_twin_get_sent = false;
1839 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
1840 transport_data->currPacketState = DISCONNECT_TYPE;
1841 }
1842}
1843
1844static void mqtt_error_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx)
1845{
1846 (void)handle;
1847 if (callbackCtx != NULL)
1848 {
1849 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
1850 switch (error)
1851 {
1852 case MQTT_CLIENT_CONNECTION_ERROR:
1853 {
1854 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_NO_NETWORK, transport_data->transport_ctx);
1855 break;
1856 }
1857 case MQTT_CLIENT_COMMUNICATION_ERROR:
1858 {
1859 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR, transport_data->transport_ctx);
1860 break;
1861 }
1862 case MQTT_CLIENT_NO_PING_RESPONSE:
1863 {
1864 LogError("Mqtt Ping Response was not encountered. Reconnecting device...");
1865 break;
1866 }
1867 case MQTT_CLIENT_PARSE_ERROR:
1868 case MQTT_CLIENT_MEMORY_ERROR:
1869 case MQTT_CLIENT_UNKNOWN_ERROR:
1870 {
1871 LogError("INTERNAL ERROR: unexpected error value received %s", MU_ENUM_TO_STRING(MQTT_CLIENT_EVENT_ERROR, error));
1872 break;
1873 }
1874 }
1875 if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_PENDING_CLOSE)
1876 {
1877 // We have encountered an mqtt protocol error in an non-closing state
1878 // The best course of action is to execute a shutdown of the mqtt/tls/socket
1879 // layer and then attempt to reconnect
1880 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT;
1881 }
1882 transport_data->currPacketState = PACKET_TYPE_ERROR;
1883 transport_data->device_twin_get_sent = false;
1884 if (transport_data->topic_MqttMessage != NULL)
1885 {
1886 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
1887 }
1888 if (transport_data->topic_GetState != NULL)
1889 {
1890 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
1891 }
1892 if (transport_data->topic_NotifyState != NULL)
1893 {
1894 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1895 }
1896 if (transport_data->topic_DeviceMethods != NULL)
1897 {
1898 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
1899 }
1900 if (transport_data->topic_InputQueue != NULL)
1901 {
1902 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
1903 }
1904 }
1905 else
1906 {
1907 LogError("Failure: mqtt called back with null context.");
1908 }
1909}
1910
1911static void SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transport_data)
1912{
1913 if (transport_data->topics_ToSubscribe != UNSUBSCRIBE_FROM_TOPIC)
1914 {
1915 uint32_t topic_subscription = 0;
1916 size_t subscribe_count = 0;
1917 uint16_t packet_id = get_next_packet_id(transport_data);
1918 SUBSCRIBE_PAYLOAD subscribe[SUBSCRIBE_TOPIC_COUNT];
1919 if ((transport_data->topic_MqttMessage != NULL) && (SUBSCRIBE_TELEMETRY_TOPIC & transport_data->topics_ToSubscribe))
1920 {
1921 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_MqttMessage);
1922 subscribe[subscribe_count].qosReturn = DELIVER_AT_LEAST_ONCE;
1923 topic_subscription |= SUBSCRIBE_TELEMETRY_TOPIC;
1924 subscribe_count++;
1925 }
1926 if ((transport_data->topic_GetState != NULL) && (SUBSCRIBE_GET_REPORTED_STATE_TOPIC & transport_data->topics_ToSubscribe))
1927 {
1928 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_GetState);
1929 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1930 topic_subscription |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
1931 subscribe_count++;
1932 transport_data->twin_resp_packet_id = packet_id;
1933 }
1934 if ((transport_data->topic_NotifyState != NULL) && (SUBSCRIBE_NOTIFICATION_STATE_TOPIC & transport_data->topics_ToSubscribe))
1935 {
1936 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_NotifyState);
1937 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1938 topic_subscription |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
1939 subscribe_count++;
1940 }
1941 if ((transport_data->topic_DeviceMethods != NULL) && (SUBSCRIBE_DEVICE_METHOD_TOPIC & transport_data->topics_ToSubscribe))
1942 {
1943 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_DeviceMethods);
1944 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1945 topic_subscription |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
1946 subscribe_count++;
1947 }
1948 if ((transport_data->topic_InputQueue != NULL) && (SUBSCRIBE_INPUT_QUEUE_TOPIC & transport_data->topics_ToSubscribe))
1949 {
1950 subscribe[subscribe_count].subscribeTopic = STRING_c_str(transport_data->topic_InputQueue);
1951 subscribe[subscribe_count].qosReturn = DELIVER_AT_MOST_ONCE;
1952 topic_subscription |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
1953 subscribe_count++;
1954 }
1955
1956 if (subscribe_count != 0)
1957 {
1958 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall call mqtt_client_subscribe to subscribe to the Message Topic.] */
1959 if (mqtt_client_subscribe(transport_data->mqttClient, packet_id, subscribe, subscribe_count) != 0)
1960 {
1961 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
1962 LogError("Failure: mqtt_client_subscribe returned error.");
1963 }
1964 else
1965 {
1966 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_018: [On success IoTHubTransport_MQTT_Common_Subscribe shall return 0.] */
1967 transport_data->topics_ToSubscribe &= ~topic_subscription;
1968 transport_data->currPacketState = SUBSCRIBE_TYPE;
1969 }
1970 }
1971 else
1972 {
1973 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_017: [Upon failure IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
1974 LogError("Failure: subscribe_topic is empty.");
1975 }
1976 transport_data->currPacketState = SUBSCRIBE_TYPE;
1977 }
1978 else
1979 {
1980 transport_data->currPacketState = PUBLISH_TYPE;
1981 }
1982}
1983
1984static bool RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, const unsigned char** payload, size_t* length)
1985{
1986 bool result;
1987 IOTHUBMESSAGE_CONTENT_TYPE contentType = IoTHubMessage_GetContentType(messageHandle);
1988 if (contentType == IOTHUBMESSAGE_BYTEARRAY)
1989 {
1990 if (IoTHubMessage_GetByteArray(messageHandle, &(*payload), length) != IOTHUB_MESSAGE_OK)
1991 {
1992 LogError("Failure result from IoTHubMessage_GetByteArray");
1993 result = false;
1994 *length = 0;
1995 }
1996 else
1997 {
1998 result = true;
1999 }
2000 }
2001 else if (contentType == IOTHUBMESSAGE_STRING)
2002 {
2003 *payload = (const unsigned char*)IoTHubMessage_GetString(messageHandle);
2004 if (*payload == NULL)
2005 {
2006 LogError("Failure result from IoTHubMessage_GetString");
2007 result = false;
2008 *length = 0;
2009 }
2010 else
2011 {
2012 *length = strlen((const char*)*payload);
2013 result = true;
2014 }
2015 }
2016 else
2017 {
2018 result = false;
2019 *length = 0;
2020 }
2021 return result;
2022}
2023
2024static void process_queued_ack_messages(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2025{
2026 PDLIST_ENTRY current_entry = transport_data->telemetry_waitingForAck.Flink;
2027 tickcounter_ms_t current_ms;
2028 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &current_ms);
2029 while (current_entry != &transport_data->telemetry_waitingForAck)
2030 {
2031 MQTT_MESSAGE_DETAILS_LIST* msg_detail_entry = containingRecord(current_entry, MQTT_MESSAGE_DETAILS_LIST, entry);
2032 DLIST_ENTRY nextListEntry;
2033 nextListEntry.Flink = current_entry->Flink;
2034
2035 if (((current_ms - msg_detail_entry->msgPublishTime) / 1000) > RESEND_TIMEOUT_VALUE_MIN)
2036 {
2037 if (msg_detail_entry->retryCount >= MAX_SEND_RECOUNT_LIMIT)
2038 {
2039 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);
2040 (void)DList_RemoveEntryList(current_entry);
2041 free(msg_detail_entry);
2042
2043 DisconnectFromClient(transport_data);
2044 }
2045 else
2046 {
2047 // Ensure that the packet state is PUBLISH_TYPE and then attempt to send the message
2048 // again
2049 if (transport_data->currPacketState == PUBLISH_TYPE)
2050 {
2051 size_t messageLength;
2052 const unsigned char* messagePayload = NULL;
2053 if (!RetrieveMessagePayload(msg_detail_entry->iotHubMessageEntry->messageHandle, &messagePayload, &messageLength))
2054 {
2055 (void)DList_RemoveEntryList(current_entry);
2056 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2057 }
2058 else
2059 {
2060 if (publish_mqtt_telemetry_msg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)
2061 {
2062 (void)DList_RemoveEntryList(current_entry);
2063 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
2064 free(msg_detail_entry);
2065 }
2066 }
2067 }
2068 else
2069 {
2070 msg_detail_entry->retryCount++;
2071 msg_detail_entry->msgPublishTime = current_ms;
2072 }
2073 }
2074 }
2075 current_entry = nextListEntry.Flink;
2076 }
2077}
2078
2079static int GetTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2080{
2081 int result;
2082
2083 if (transport_data->xioTransport == NULL)
2084 {
2085 // construct address
2086 const char* hostAddress = STRING_c_str(transport_data->hostAddress);
2087 MQTT_TRANSPORT_PROXY_OPTIONS mqtt_proxy_options;
2088
2089 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_011: [ If no `proxy_data` option has been set, NULL shall be passed as the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create`. ]*/
2090 mqtt_proxy_options.host_address = transport_data->http_proxy_hostname;
2091 mqtt_proxy_options.port = transport_data->http_proxy_port;
2092 mqtt_proxy_options.username = transport_data->http_proxy_username;
2093 mqtt_proxy_options.password = transport_data->http_proxy_password;
2094
2095 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_010: [ If the `proxy_data` option has been set, the proxy options shall be filled in the argument `mqtt_transport_proxy_options` when calling the function `get_io_transport` passed in `IoTHubTransport_MQTT_Common__Create` to obtain the underlying IO handle. ]*/
2096 transport_data->xioTransport = transport_data->get_io_transport(hostAddress, (transport_data->http_proxy_hostname == NULL) ? NULL : &mqtt_proxy_options);
2097 if (transport_data->xioTransport == NULL)
2098 {
2099 LogError("Unable to create the lower level TLS layer.");
2100 result = MU_FAILURE;
2101 }
2102 else
2103 {
2104 if (transport_data->saved_tls_options != NULL)
2105 {
2106 if (OptionHandler_FeedOptions(transport_data->saved_tls_options, transport_data->xioTransport) != OPTIONHANDLER_OK)
2107 {
2108 LogError("Failed feeding existing options to new TLS instance.");
2109 result = MU_FAILURE;
2110 }
2111 else
2112 {
2113 // The tlsio has the options, so our copy can be deleted
2114 set_saved_tls_options(transport_data, NULL);
2115 result = 0;
2116 }
2117 }
2118 else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_X509_ECC)
2119 {
2120 if (IoTHubClient_Auth_Set_xio_Certificate(transport_data->authorization_module, transport_data->xioTransport) != 0)
2121 {
2122 LogError("Unable to create the lower level TLS layer.");
2123 result = MU_FAILURE;
2124 }
2125 else
2126 {
2127 result = 0;
2128 }
2129 }
2130 else
2131 {
2132 result = 0;
2133 }
2134 }
2135 }
2136 else
2137 {
2138 result = 0;
2139 }
2140 return result;
2141}
2142
2143static STRING_HANDLE buildClientId(const char* device_id, const char* module_id)
2144{
2145 if (module_id == NULL)
2146 {
2147 return STRING_construct_sprintf("%s", device_id);
2148 }
2149 else
2150 {
2151 return STRING_construct_sprintf("%s/%s", device_id, module_id);
2152 }
2153}
2154
2155static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2156{
2157 int result;
2158
2159 char* sasToken = NULL;
2160 result = 0;
2161
2162 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
2163 if (cred_type == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY || cred_type == IOTHUB_CREDENTIAL_TYPE_DEVICE_AUTH)
2164 {
2165 sasToken = IoTHubClient_Auth_Get_SasToken(transport_data->authorization_module, STRING_c_str(transport_data->devicesAndModulesPath), 0, NULL);
2166 if (sasToken == NULL)
2167 {
2168 LogError("failure getting sas token from IoTHubClient_Auth_Get_SasToken.");
2169 result = MU_FAILURE;
2170 }
2171 }
2172 else if (cred_type == IOTHUB_CREDENTIAL_TYPE_SAS_TOKEN)
2173 {
2174 SAS_TOKEN_STATUS token_status = IoTHubClient_Auth_Is_SasToken_Valid(transport_data->authorization_module);
2175 if (token_status == SAS_TOKEN_STATUS_INVALID)
2176 {
2177 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
2178 result = MU_FAILURE;
2179 }
2180 else if (token_status == SAS_TOKEN_STATUS_FAILED)
2181 {
2182 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_BAD_CREDENTIAL, transport_data->transport_ctx);
2183 result = MU_FAILURE;
2184 }
2185 else
2186 {
2187 sasToken = IoTHubClient_Auth_Get_SasToken(transport_data->authorization_module, NULL, 0, NULL);
2188 if (sasToken == NULL)
2189 {
2190 LogError("failure getting sas Token.");
2191 result = MU_FAILURE;
2192 }
2193 }
2194 }
2195
2196 if (result == 0)
2197 {
2198 if (!transport_data->isProductInfoSet)
2199 {
2200 // This requires the iothubClientHandle, which sadly the MQTT transport only gets on DoWork, so this code still needs to remain here.
2201 // The correct place for this would be in the Create method, but we don't get the client handle there.
2202 // Also, when device multiplexing is used, the customer creates the transport directly and explicitly, when the client is still not created.
2203 // This will be a major hurdle when we add device multiplexing to MQTT transport.
2204
2205 STRING_HANDLE clone;
2206 const char* product_info = transport_data->transport_callbacks.prod_info_cb(transport_data->transport_ctx);
2207 if (product_info == NULL)
2208 {
2209 clone = STRING_construct_sprintf("%s%%2F%s", CLIENT_DEVICE_TYPE_PREFIX, IOTHUB_SDK_VERSION);
2210 }
2211 else
2212 {
2213 clone = URL_EncodeString(product_info);
2214 }
2215
2216 if (clone == NULL)
2217 {
2218 LogError("Failed obtaining the product info");
2219 }
2220 else
2221 {
2222 if (STRING_concat_with_STRING(transport_data->configPassedThroughUsername, clone) != 0)
2223 {
2224 LogError("Failed concatenating the product info");
2225 }
2226 else
2227 {
2228 transport_data->isProductInfoSet = true;
2229 }
2230
2231 STRING_delete(clone);
2232 }
2233 }
2234
2235 STRING_HANDLE clientId;
2236
2237 clientId = buildClientId(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
2238 if (NULL == clientId)
2239 {
2240 LogError("Unable to allocate clientId");
2241 result = MU_FAILURE;
2242 }
2243 else
2244 {
2245 MQTT_CLIENT_OPTIONS options = { 0 };
2246 options.clientId = (char*)STRING_c_str(clientId);
2247 options.willMessage = NULL;
2248 options.username = (char*)STRING_c_str(transport_data->configPassedThroughUsername);
2249 if (sasToken != NULL)
2250 {
2251 options.password = sasToken;
2252 }
2253 options.keepAliveInterval = transport_data->keepAliveValue;
2254 options.useCleanSession = false;
2255 options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE;
2256
2257 if (GetTransportProviderIfNecessary(transport_data) == 0)
2258 {
2259 transport_data->conn_attempted = true;
2260 if (mqtt_client_connect(transport_data->mqttClient, transport_data->xioTransport, &options) != 0)
2261 {
2262 LogError("failure connecting to address %s.", STRING_c_str(transport_data->hostAddress));
2263 result = MU_FAILURE;
2264 }
2265 else
2266 {
2267 transport_data->currPacketState = CONNECT_TYPE;
2268 transport_data->isRetryExpiredCallbackSet = false;
2269 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time);
2270 result = 0;
2271 }
2272 }
2273 else
2274 {
2275 result = MU_FAILURE;
2276 }
2277
2278 if (sasToken != NULL)
2279 {
2280 free(sasToken);
2281 }
2282 STRING_delete(clientId);
2283 }
2284
2285 }
2286 return result;
2287}
2288
2289static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transport_data)
2290{
2291 int result = 0;
2292
2293 // Make sure we're not destroying the object
2294 if (!transport_data->isDestroyCalled)
2295 {
2296 RETRY_ACTION retry_action = RETRY_ACTION_RETRY_LATER;
2297
2298 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_007: [ IoTHubTransport_MQTT_Common_DoWork shall try to reconnect according to the current retry policy set ]
2299 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_NOT_CONNECTED && transport_data->isRecoverableError)
2300 {
2301 // Note: in case retry_control_should_retry fails, the reconnection shall be attempted anyway (defaulting to policy IOTHUB_CLIENT_RETRY_IMMEDIATE).
2302 if (retry_control_should_retry(transport_data->retry_control_handle, &retry_action) != 0 || retry_action == RETRY_ACTION_RETRY_NOW)
2303 {
2304 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->connectTick) != 0)
2305 {
2306 transport_data->connectFailCount++;
2307 result = MU_FAILURE;
2308 }
2309 else
2310 {
2311 ResetConnectionIfNecessary(transport_data);
2312
2313 if (SendMqttConnectMsg(transport_data) != 0)
2314 {
2315 transport_data->connectFailCount++;
2316 result = MU_FAILURE;
2317 }
2318 else
2319 {
2320 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_CONNECTING;
2321 transport_data->connectFailCount = 0;
2322 result = 0;
2323 }
2324 }
2325 }
2326 else if (retry_action == RETRY_ACTION_STOP_RETRYING)
2327 {
2328 // Set callback if retry expired
2329 if (!transport_data->isRetryExpiredCallbackSet)
2330 {
2331 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
2332 transport_data->isRetryExpiredCallbackSet = true;
2333 }
2334 result = MU_FAILURE;
2335 }
2336 else if (retry_action == RETRY_ACTION_RETRY_LATER)
2337 {
2338 result = MU_FAILURE;
2339 }
2340 }
2341 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_EXECUTE_DISCONNECT)
2342 {
2343 // Need to disconnect from client
2344 DisconnectFromClient(transport_data);
2345 result = 0;
2346 }
2347 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_001: [ IoTHubTransport_MQTT_Common_DoWork shall trigger reconnection if the mqtt_client_connect does not complete within `keepalive` seconds]
2348 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTING)
2349 {
2350 tickcounter_ms_t current_time;
2351 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
2352 {
2353 LogError("failed verifying MQTT_CLIENT_STATUS_CONNECTING timeout");
2354 result = MU_FAILURE;
2355 }
2356 else if ((current_time - transport_data->mqtt_connect_time) / 1000 > transport_data->connect_timeout_in_sec)
2357 {
2358 LogError("mqtt_client timed out waiting for CONNACK");
2359 transport_data->currPacketState = PACKET_TYPE_ERROR;
2360 DisconnectFromClient(transport_data);
2361 result = MU_FAILURE;
2362 }
2363 }
2364 else if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_CONNECTED)
2365 {
2366 // We are connected and not being closed, so does SAS need to reconnect?
2367 tickcounter_ms_t current_time;
2368 if (tickcounter_get_current_ms(transport_data->msgTickCounter, &current_time) != 0)
2369 {
2370 transport_data->connectFailCount++;
2371 result = MU_FAILURE;
2372 }
2373 else
2374 {
2375 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
2376 // If the credential type is not an x509 certificate then we shall renew the Sas_Token
2377 if (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_X509_ECC)
2378 {
2379 size_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);
2380 if ((current_time - transport_data->mqtt_connect_time) / 1000 > (sas_token_expiry*SAS_REFRESH_MULTIPLIER))
2381 {
2382 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_058: [ If the sas token has timed out IoTHubTransport_MQTT_Common_DoWork shall disconnect from the mqtt client and destroy the transport information and wait for reconnect. ] */
2383 DisconnectFromClient(transport_data);
2384
2385 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_EXPIRED_SAS_TOKEN, transport_data->transport_ctx);
2386 transport_data->currPacketState = UNKNOWN_TYPE;
2387 if (transport_data->topic_MqttMessage != NULL)
2388 {
2389 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
2390 }
2391 if (transport_data->topic_GetState != NULL)
2392 {
2393 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2394 }
2395 if (transport_data->topic_NotifyState != NULL)
2396 {
2397 transport_data->topics_ToSubscribe |= SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2398 }
2399 if (transport_data->topic_DeviceMethods != NULL)
2400 {
2401 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2402 }
2403 if (transport_data->topic_InputQueue != NULL)
2404 {
2405 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
2406 }
2407 }
2408 }
2409 }
2410 }
2411 }
2412 return result;
2413}
2414
2415static STRING_HANDLE buildConfigForUsername(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
2416{
2417 if (moduleId == NULL)
2418 {
2419 return STRING_construct_sprintf("%s.%s/%s/?api-version=%s&DeviceClientType=", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, IOTHUB_API_VERSION);
2420 }
2421 else
2422 {
2423 return STRING_construct_sprintf("%s.%s/%s/%s/?api-version=%s&DeviceClientType=", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, moduleId, IOTHUB_API_VERSION);
2424 }
2425}
2426
2427static STRING_HANDLE buildMqttEventString(const char* device_id, const char* module_id)
2428{
2429 if (module_id == NULL)
2430 {
2431 return STRING_construct_sprintf(TOPIC_DEVICE_DEVICE, device_id);
2432 }
2433 else
2434 {
2435 return STRING_construct_sprintf(TOPIC_DEVICE_DEVICE_MODULE, device_id, module_id);
2436 }
2437}
2438
2439static STRING_HANDLE buildDevicesAndModulesPath(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
2440{
2441 if (moduleId == NULL)
2442 {
2443 return STRING_construct_sprintf("%s.%s/devices/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId);
2444 }
2445 else
2446 {
2447 return STRING_construct_sprintf("%s.%s/devices/%s/modules/%s", upperConfig->iotHubName, upperConfig->iotHubSuffix, upperConfig->deviceId, moduleId);
2448 }
2449}
2450
2451static PMQTTTRANSPORT_HANDLE_DATA InitializeTransportHandleData(const IOTHUB_CLIENT_CONFIG* upperConfig, PDLIST_ENTRY waitingToSend, IOTHUB_AUTHORIZATION_HANDLE auth_module, const char* moduleId)
2452{
2453 PMQTTTRANSPORT_HANDLE_DATA state = (PMQTTTRANSPORT_HANDLE_DATA)malloc(sizeof(MQTTTRANSPORT_HANDLE_DATA));
2454 if (state == NULL)
2455 {
2456 LogError("Could not create MQTT transport state. Memory allocation failed.");
2457 }
2458 else
2459 {
2460 memset(state, 0, sizeof(MQTTTRANSPORT_HANDLE_DATA));
2461 if ((state->msgTickCounter = tickcounter_create()) == NULL)
2462 {
2463 LogError("Invalid Argument: iotHubName is empty");
2464 free_transport_handle_data(state);
2465 state = NULL;
2466 }
2467 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_005: [ MQTT transport shall use EXPONENTIAL_WITH_BACK_OFF as default retry policy ]
2468 else if ((state->retry_control_handle = retry_control_create(DEFAULT_RETRY_POLICY, DEFAULT_RETRY_TIMEOUT_IN_SECONDS)) == NULL)
2469 {
2470 LogError("Failed creating default retry control");
2471 free_transport_handle_data(state);
2472 state = NULL;
2473 }
2474 else if ((state->device_id = STRING_construct(upperConfig->deviceId)) == NULL)
2475 {
2476 LogError("failure constructing device_id.");
2477 free_transport_handle_data(state);
2478 state = NULL;
2479 }
2480 else if ((moduleId != NULL) && ((state->module_id = STRING_construct(moduleId)) == NULL))
2481 {
2482 LogError("failure constructing module_id.");
2483 free_transport_handle_data(state);
2484 state = NULL;
2485 }
2486 else if ((state->devicesAndModulesPath = buildDevicesAndModulesPath(upperConfig, moduleId)) == NULL)
2487 {
2488 LogError("failure constructing devicesPath.");
2489 free_transport_handle_data(state);
2490 state = NULL;
2491 }
2492 else
2493 {
2494 if ((state->topic_MqttEvent = buildMqttEventString(upperConfig->deviceId, moduleId)) == NULL)
2495 {
2496 LogError("Could not create topic_MqttEvent for MQTT");
2497 free_transport_handle_data(state);
2498 state = NULL;
2499 }
2500 else
2501 {
2502 state->mqttClient = mqtt_client_init(mqtt_notification_callback, mqtt_operation_complete_callback, state, mqtt_error_callback, state);
2503 if (state->mqttClient == NULL)
2504 {
2505 LogError("failure initializing mqtt client.");
2506 free_transport_handle_data(state);
2507 state = NULL;
2508 }
2509 else
2510 {
2511 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_008: [If the upperConfig contains a valid protocolGatewayHostName value the this shall be used for the hostname, otherwise the hostname shall be constructed using the iothubname and iothubSuffix.] */
2512 if (upperConfig->protocolGatewayHostName == NULL)
2513 {
2514 state->hostAddress = STRING_construct_sprintf("%s.%s", upperConfig->iotHubName, upperConfig->iotHubSuffix);
2515 }
2516 else
2517 {
2518 state->hostAddress = STRING_construct(upperConfig->protocolGatewayHostName);
2519 }
2520
2521 if (state->hostAddress == NULL)
2522 {
2523 LogError("failure constructing host address.");
2524 free_transport_handle_data(state);
2525 state = NULL;
2526 }
2527 else if ((state->configPassedThroughUsername = buildConfigForUsername(upperConfig, moduleId)) == NULL)
2528 {
2529 free_transport_handle_data(state);
2530 state = NULL;
2531 }
2532 else
2533 {
2534 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_010: [IoTHubTransport_MQTT_Common_Create shall allocate memory to save its internal state where all topics, hostname, device_id, device_key, sasTokenSr and client handle shall be saved.] */
2535 DList_InitializeListHead(&(state->telemetry_waitingForAck));
2536 DList_InitializeListHead(&(state->ack_waiting_queue));
2537 DList_InitializeListHead(&(state->pending_get_twin_queue));
2538 state->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
2539 state->isRecoverableError = true;
2540 state->packetId = 1;
2541 state->waitingToSend = waitingToSend;
2542 state->currPacketState = CONNECT_TYPE;
2543 state->keepAliveValue = DEFAULT_MQTT_KEEPALIVE;
2544 state->connect_timeout_in_sec = DEFAULT_CONNACK_TIMEOUT;
2545 state->topics_ToSubscribe = UNSUBSCRIBE_FROM_TOPIC;
2546 srand((unsigned int)get_time(NULL));
2547 state->authorization_module = auth_module;
2548
2549 state->isDestroyCalled = false;
2550 state->isRetryExpiredCallbackSet = false;
2551 state->isRegistered = false;
2552 state->device_twin_get_sent = false;
2553 state->xioTransport = NULL;
2554 state->portNum = 0;
2555 state->connectFailCount = 0;
2556 state->connectTick = 0;
2557 state->topic_MqttMessage = NULL;
2558 state->topic_GetState = NULL;
2559 state->topic_NotifyState = NULL;
2560 state->topic_DeviceMethods = NULL;
2561 state->topic_InputQueue = NULL;
2562 state->log_trace = state->raw_trace = false;
2563 state->isProductInfoSet = false;
2564 state->auto_url_encode_decode = false;
2565 state->conn_attempted = false;
2566 }
2567 }
2568 }
2569 }
2570 }
2571 return state;
2572}
2573
2574TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Create(const IOTHUBTRANSPORT_CONFIG* config, MQTT_GET_IO_TRANSPORT get_io_transport, TRANSPORT_CALLBACKS_INFO* cb_info, void* ctx)
2575{
2576 PMQTTTRANSPORT_HANDLE_DATA result;
2577 size_t deviceIdSize;
2578
2579 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_001: [If parameter config is NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2580 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ if get_io_transport is NULL then IoTHubTransport_MQTT_Common_Create shall return NULL. ] */
2581 if (config == NULL || get_io_transport == NULL || cb_info == NULL)
2582 {
2583 LogError("Invalid Argument config: %p, get_io_transport: %p, cb_info: %p", config, get_io_transport, cb_info);
2584 result = NULL;
2585 }
2586 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig, auth_module_handle or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2587 else if (config->auth_module_handle == NULL)
2588 {
2589 LogError("Invalid Argument: auth_module_handle is NULL)");
2590 result = NULL;
2591 }
2592 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2593 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2594 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_003: [If both deviceKey & deviceSasToken fields are NOT NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2595 else if (config->upperConfig == NULL ||
2596 config->upperConfig->protocol == NULL ||
2597 config->upperConfig->deviceId == NULL ||
2598 ((config->upperConfig->deviceKey != NULL) && (config->upperConfig->deviceSasToken != NULL)) ||
2599 config->upperConfig->iotHubName == NULL ||
2600 config->upperConfig->iotHubSuffix == NULL)
2601 {
2602 LogError("Invalid Argument: upperConfig structure contains an invalid parameter");
2603 result = NULL;
2604 }
2605 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_002: [If the parameter config's variables upperConfig, auth_module_handle or waitingToSend are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2606 else if (config->waitingToSend == NULL)
2607 {
2608 LogError("Invalid Argument: waitingToSend is NULL)");
2609 result = NULL;
2610 }
2611 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_006: [If the upperConfig's variables deviceId is an empty strings or length is greater then 128 then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2612 else if (((deviceIdSize = strlen(config->upperConfig->deviceId)) > 128U) || (deviceIdSize == 0))
2613 {
2614 LogError("Invalid Argument: DeviceId is of an invalid size");
2615 result = NULL;
2616 }
2617 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2618 else if ((config->upperConfig->deviceKey != NULL) && (strlen(config->upperConfig->deviceKey) == 0))
2619 {
2620 LogError("Invalid Argument: deviceKey is empty");
2621 result = NULL;
2622 }
2623 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, both deviceKey and deviceSasToken, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2624 else if ((config->upperConfig->deviceSasToken != NULL) && (strlen(config->upperConfig->deviceSasToken) == 0))
2625 {
2626 LogError("Invalid Argument: deviceSasToken is empty");
2627 result = NULL;
2628 }
2629 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_003: [If the upperConfig's variables deviceId, deviceKey, iotHubName, protocol, or iotHubSuffix are NULL then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2630 else if (strlen(config->upperConfig->iotHubName) == 0)
2631 {
2632 LogError("Invalid Argument: iotHubName is empty");
2633 result = NULL;
2634 }
2635 else if (IoTHub_Transport_ValidateCallbacks(cb_info) != 0)
2636 {
2637 LogError("failure checking transport callbacks");
2638 result = NULL;
2639 }
2640 else
2641 {
2642 result = InitializeTransportHandleData(config->upperConfig, config->waitingToSend, config->auth_module_handle, config->moduleId);
2643 if (result != NULL)
2644 {
2645 result->get_io_transport = get_io_transport;
2646 result->http_proxy_hostname = NULL;
2647 result->http_proxy_username = NULL;
2648 result->http_proxy_password = NULL;
2649
2650 result->transport_ctx = ctx;
2651 memcpy(&result->transport_callbacks, cb_info, sizeof(TRANSPORT_CALLBACKS_INFO));
2652 }
2653 }
2654 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_009: [If any error is encountered then IoTHubTransport_MQTT_Common_Create shall return NULL.] */
2655 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_011: [On Success IoTHubTransport_MQTT_Common_Create shall return a non-NULL value.] */
2656 return result;
2657}
2658
2659void IoTHubTransport_MQTT_Common_Destroy(TRANSPORT_LL_HANDLE handle)
2660{
2661 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_012: [IoTHubTransport_MQTT_Common_Destroy shall do nothing if parameter handle is NULL.] */
2662 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2663 if (transport_data != NULL)
2664 {
2665 transport_data->isDestroyCalled = true;
2666
2667 DisconnectFromClient(transport_data);
2668
2669 //Empty the Waiting for Ack Messages.
2670 while (!DList_IsListEmpty(&transport_data->telemetry_waitingForAck))
2671 {
2672 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->telemetry_waitingForAck);
2673 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
2674 sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY);
2675 free(mqttMsgEntry);
2676 }
2677 while (!DList_IsListEmpty(&transport_data->ack_waiting_queue))
2678 {
2679 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->ack_waiting_queue);
2680 MQTT_DEVICE_TWIN_ITEM* mqtt_device_twin = containingRecord(currentEntry, MQTT_DEVICE_TWIN_ITEM, entry);
2681
2682 if (mqtt_device_twin->userCallback == NULL)
2683 {
2684 transport_data->transport_callbacks.twin_rpt_state_complete_cb(mqtt_device_twin->iothub_msg_id, STATUS_CODE_TIMEOUT_VALUE, transport_data->transport_ctx);
2685 }
2686 else
2687 {
2688 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
2689 }
2690
2691 destroy_device_twin_get_message(mqtt_device_twin);
2692 }
2693 while (!DList_IsListEmpty(&transport_data->pending_get_twin_queue))
2694 {
2695 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->pending_get_twin_queue);
2696
2697 MQTT_DEVICE_TWIN_ITEM* mqtt_device_twin = containingRecord(currentEntry, MQTT_DEVICE_TWIN_ITEM, entry);
2698
2699 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
2700
2701 destroy_device_twin_get_message(mqtt_device_twin);
2702 }
2703
2704 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_014: [IoTHubTransport_MQTT_Common_Destroy shall free all the resources currently in use.] */
2705 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/
2706 free_transport_handle_data(transport_data);
2707 }
2708}
2709
2710int IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin(TRANSPORT_LL_HANDLE handle)
2711{
2712 int result;
2713 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2714 if (transport_data == NULL)
2715 {
2716 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_042: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
2717 LogError("Invalid handle parameter. NULL.");
2718 result = MU_FAILURE;
2719 }
2720 else
2721 {
2722 if (transport_data->topic_GetState == NULL)
2723 {
2724 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_044: [IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall construct the get state topic string and the notify state topic string.] */
2725 transport_data->topic_GetState = STRING_construct(TOPIC_GET_DESIRED_STATE);
2726 if (transport_data->topic_GetState == NULL)
2727 {
2728 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_046: [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return a non-zero value.] */
2729 LogError("Failure: unable constructing reported state topic");
2730 result = MU_FAILURE;
2731 }
2732 else
2733 {
2734 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_047: [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceTwin shall return 0.] */
2735 transport_data->topics_ToSubscribe |= SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2736 result = 0;
2737 }
2738 }
2739 else
2740 {
2741 result = 0;
2742 }
2743 if (result == 0)
2744 {
2745 changeStateToSubscribeIfAllowed(transport_data);
2746 }
2747 }
2748 return result;
2749}
2750
2751void IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin(TRANSPORT_LL_HANDLE handle)
2752{
2753 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2754 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_048: [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall do nothing.] */
2755 if (transport_data != NULL)
2756 {
2757 if (transport_data->topic_GetState != NULL)
2758 {
2759 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_049: [If subscribe_state is set to IOTHUB_DEVICE_TWIN_DESIRED_STATE then IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall unsubscribe from the topic_GetState to the mqtt client.] */
2760 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_GET_REPORTED_STATE_TOPIC;
2761 STRING_delete(transport_data->topic_GetState);
2762 transport_data->topic_GetState = NULL;
2763 }
2764 if (transport_data->topic_NotifyState != NULL)
2765 {
2766 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_050: [If subscribe_state is set to IOTHUB_DEVICE_TWIN_NOTIFICATION_STATE then IoTHubTransport_MQTT_Common_Unsubscribe_DeviceTwin shall unsubscribe from the topic_NotifyState to the mqtt client.] */
2767 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_NOTIFICATION_STATE_TOPIC;
2768 STRING_delete(transport_data->topic_NotifyState);
2769 transport_data->topic_NotifyState = NULL;
2770 }
2771 }
2772 else
2773 {
2774 LogError("Invalid argument to unsubscribe (handle is NULL).");
2775 }
2776}
2777
2778IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetTwinAsync(IOTHUB_DEVICE_HANDLE handle, IOTHUB_CLIENT_DEVICE_TWIN_CALLBACK completionCallback, void* callbackContext)
2779{
2780 IOTHUB_CLIENT_RESULT result;
2781
2782 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_001: [ If `handle` or `completionCallback` are `NULL` than `IoTHubTransport_MQTT_Common_GetTwinAsync` shall return IOTHUB_CLIENT_INVALID_ARG. ]
2783 if (handle == NULL || completionCallback == NULL)
2784 {
2785 LogError("Invalid argument (handle=%p, completionCallback=%p)", handle, completionCallback);
2786 result = IOTHUB_CLIENT_INVALID_ARG;
2787 }
2788 else
2789 {
2790 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2791 MQTT_DEVICE_TWIN_ITEM* mqtt_info;
2792
2793 if ((mqtt_info = create_device_twin_get_message(transport_data)) == NULL)
2794 {
2795 LogError("Failed creating the device twin get request message");
2796 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
2797 result = IOTHUB_CLIENT_ERROR;
2798 }
2799 else if (tickcounter_get_current_ms(transport_data->msgTickCounter, &mqtt_info->msgEnqueueTime) != 0)
2800 {
2801 LogError("Failed setting the get twin request enqueue time");
2802 destroy_device_twin_get_message(mqtt_info);
2803 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
2804 result = IOTHUB_CLIENT_ERROR;
2805 }
2806 else
2807 {
2808 mqtt_info->userCallback = completionCallback;
2809 mqtt_info->userContext = callbackContext;
2810
2811 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_002: [ The request shall be queued to be sent when the transport is connected, through DoWork ]
2812 DList_InsertTailList(&transport_data->pending_get_twin_queue, &mqtt_info->entry);
2813
2814 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_004: [ If no failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_OK ]
2815 result = IOTHUB_CLIENT_OK;
2816 }
2817 }
2818
2819 return result;
2820}
2821
2822int IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod(IOTHUB_DEVICE_HANDLE handle)
2823{
2824 int result;
2825 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2826
2827 if (transport_data == NULL)
2828 {
2829 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_001 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
2830 LogError("Invalid handle parameter. NULL.");
2831 result = MU_FAILURE;
2832 }
2833 else
2834 {
2835 if (transport_data->topic_DeviceMethods == NULL)
2836 {
2837 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_004 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall construct the DEVICE_METHOD topic string for subscribe.]*/
2838 transport_data->topic_DeviceMethods = STRING_construct(TOPIC_DEVICE_METHOD_SUBSCRIBE);
2839 if (transport_data->topic_DeviceMethods == NULL)
2840 {
2841 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_006 : [Upon failure IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return a non - zero value.]*/
2842 LogError("Failure: unable constructing device method subscribe topic");
2843 result = MU_FAILURE;
2844 }
2845 else
2846 {
2847 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_003 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall set the signaling flag for DEVICE_METHOD topic for the receiver's topic list. ]*/
2848 transport_data->topics_ToSubscribe |= SUBSCRIBE_DEVICE_METHOD_TOPIC;
2849 result = 0;
2850 }
2851 }
2852 else
2853 {
2854 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_002 : [If the MQTT transport has been previously subscribed to DEVICE_METHOD topic IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall do nothing and return 0.]*/
2855 result = 0;
2856 }
2857
2858 if (result == 0)
2859 {
2860 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_005 : [IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall schedule the send of the subscription.]*/
2861 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_007 : [On success IoTHubTransport_MQTT_Common_Subscribe_DeviceMethod shall return 0.]*/
2862 changeStateToSubscribeIfAllowed(transport_data);
2863 }
2864 }
2865 return result;
2866}
2867
2868void IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod(TRANSPORT_LL_HANDLE handle)
2869{
2870 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2871 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_008 : [If the parameter handle is NULL than IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall do nothing and return.]*/
2872 if (transport_data != NULL)
2873 {
2874 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_009 : [If the MQTT transport has not been subscribed to DEVICE_METHOD topic IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall do nothing and return.]*/
2875 if (transport_data->topic_DeviceMethods != NULL)
2876 {
2877 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_010 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall construct the DEVICE_METHOD topic string for unsubscribe.]*/
2878 const char* unsubscribe[1];
2879 unsubscribe[0] = STRING_c_str(transport_data->topic_DeviceMethods);
2880
2881 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_011 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall send the unsubscribe.]*/
2882 if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
2883 {
2884 LogError("Failure calling mqtt_client_unsubscribe");
2885 }
2886
2887 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_012 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall removes the signaling flag for DEVICE_METHOD topic from the receiver's topic list. ]*/
2888 STRING_delete(transport_data->topic_DeviceMethods);
2889 transport_data->topic_DeviceMethods = NULL;
2890 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_DEVICE_METHOD_TOPIC;
2891 }
2892 }
2893 else
2894 {
2895 LogError("Invalid argument to unsubscribe (NULL).");
2896 }
2897}
2898
2899int IoTHubTransport_MQTT_Common_DeviceMethod_Response(TRANSPORT_LL_HANDLE handle, METHOD_HANDLE methodId, const unsigned char* response, size_t respSize, int status)
2900{
2901 int result;
2902 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
2903 if (transport_data != NULL)
2904 {
2905 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_042: [ IoTHubTransport_MQTT_Common_DeviceMethod_Response shall publish an mqtt message for the device method response. ] */
2906 DEVICE_METHOD_INFO* dev_method_info = (DEVICE_METHOD_INFO*)methodId;
2907 if (dev_method_info == NULL)
2908 {
2909 LogError("Failure: DEVICE_METHOD_INFO was NULL");
2910 result = MU_FAILURE;
2911 }
2912 else
2913 {
2914 if (publish_device_method_message(transport_data, status, dev_method_info->request_id, response, respSize) != 0)
2915 {
2916 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
2917 LogError("Failure: publishing device method response");
2918 result = MU_FAILURE;
2919 }
2920 else
2921 {
2922 result = 0;
2923 }
2924 STRING_delete(dev_method_info->request_id);
2925 free(dev_method_info);
2926 }
2927 }
2928 else
2929 {
2930 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_041: [ If the parameter handle is NULL than IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
2931 result = MU_FAILURE;
2932 LogError("Failure: invalid TRANSPORT_LL_HANDLE parameter specified");
2933 }
2934 return result;
2935}
2936
2937static STRING_HANDLE buildTopicMqttMessage(const char* device_id, const char* module_id)
2938{
2939 if (module_id == NULL)
2940 {
2941 return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id);
2942 }
2943 else
2944 {
2945 return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id);
2946 }
2947}
2948
2949int IoTHubTransport_MQTT_Common_Subscribe(TRANSPORT_LL_HANDLE handle)
2950{
2951 int result;
2952 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2953 if (transport_data == NULL)
2954 {
2955 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_015: [If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe shall return a non-zero value.] */
2956 LogError("Invalid handle parameter. NULL.");
2957 result = MU_FAILURE;
2958 }
2959 else
2960 {
2961 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the Message Topic.] */
2962 transport_data->topic_MqttMessage = buildTopicMqttMessage(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
2963 if (transport_data->topic_MqttMessage == NULL)
2964 {
2965 LogError("Failure constructing Message Topic");
2966 result = MU_FAILURE;
2967 }
2968 else
2969 {
2970 transport_data->topics_ToSubscribe |= SUBSCRIBE_TELEMETRY_TOPIC;
2971 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_035: [If current packet state is not CONNACT, DISCONNECT_TYPE, or PACKET_TYPE_ERROR then IoTHubTransport_MQTT_Common_Subscribe shall set the packet state to SUBSCRIBE_TYPE.]*/
2972 changeStateToSubscribeIfAllowed(transport_data);
2973 result = 0;
2974 }
2975 }
2976 return result;
2977}
2978
2979void IoTHubTransport_MQTT_Common_Unsubscribe(TRANSPORT_LL_HANDLE handle)
2980{
2981 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
2982 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_019: [If parameter handle is NULL then IoTHubTransport_MQTT_Common_Unsubscribe shall do nothing.] */
2983 if (transport_data != NULL)
2984 {
2985 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_020: [IoTHubTransport_MQTT_Common_Unsubscribe shall call mqtt_client_unsubscribe to unsubscribe the mqtt message topic.] */
2986 const char* unsubscribe[1];
2987 unsubscribe[0] = STRING_c_str(transport_data->topic_MqttMessage);
2988 if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
2989 {
2990 LogError("Failure calling mqtt_client_unsubscribe");
2991 }
2992 STRING_delete(transport_data->topic_MqttMessage);
2993 transport_data->topic_MqttMessage = NULL;
2994 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_TELEMETRY_TOPIC;
2995 }
2996 else
2997 {
2998 LogError("Invalid argument to unsubscribe (NULL).");
2999 }
3000}
3001
3002IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_MQTT_Common_ProcessItem(TRANSPORT_LL_HANDLE handle, IOTHUB_IDENTITY_TYPE item_type, IOTHUB_IDENTITY_INFO* iothub_item)
3003{
3004 IOTHUB_PROCESS_ITEM_RESULT result;
3005 /* Codes_SRS_IOTHUBCLIENT_LL_07_001: [ If handle or iothub_item are NULL then IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3006 if (handle == NULL || iothub_item == NULL)
3007 {
3008 LogError("Invalid handle parameter iothub_item=%p", iothub_item);
3009 result = IOTHUB_PROCESS_ERROR;
3010 }
3011 else
3012 {
3013 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3014
3015 if (transport_data->currPacketState == PUBLISH_TYPE)
3016 {
3017 // Ensure the reported property suback has been received
3018 if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv)
3019 {
3020 MQTT_DEVICE_TWIN_ITEM* mqtt_info = (MQTT_DEVICE_TWIN_ITEM*)malloc(sizeof(MQTT_DEVICE_TWIN_ITEM));
3021 if (mqtt_info == NULL)
3022 {
3023 /* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3024 result = IOTHUB_PROCESS_ERROR;
3025 }
3026 else
3027 {
3028 /*Codes_SRS_IOTHUBCLIENT_LL_07_003: [ IoTHubTransport_MQTT_Common_ProcessItem shall publish a message to the mqtt protocol with the message topic for the message type.]*/
3029 mqtt_info->iothub_type = item_type;
3030 mqtt_info->iothub_msg_id = iothub_item->device_twin->item_id;
3031 mqtt_info->retryCount = 0;
3032 mqtt_info->userCallback = NULL;
3033 mqtt_info->userContext = NULL;
3034 mqtt_info->msgEnqueueTime = 0;
3035
3036 /* Codes_SRS_IOTHUBCLIENT_LL_07_005: [ If successful IoTHubTransport_MQTT_Common_ProcessItem shall add mqtt info structure acknowledgement queue. ] */
3037 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
3038
3039 if (publish_device_twin_message(transport_data, iothub_item->device_twin, mqtt_info) != 0)
3040 {
3041 DList_RemoveEntryList(&mqtt_info->entry);
3042
3043 free(mqtt_info);
3044 /* Codes_SRS_IOTHUBCLIENT_LL_07_004: [ If any errors are encountered IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_ERROR. ]*/
3045 result = IOTHUB_PROCESS_ERROR;
3046 }
3047 else
3048 {
3049 result = IOTHUB_PROCESS_OK;
3050 }
3051 }
3052 }
3053 else
3054 {
3055 /* Codes_SRS_IOTHUBCLIENT_LL_07_006: [ If the item_type is not a supported type IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_CONTINUE. ]*/
3056 result = IOTHUB_PROCESS_CONTINUE;
3057 }
3058 }
3059 else
3060 {
3061 /* Codes_SRS_IOTHUBCLIENT_LL_07_002: [ If the mqtt is not ready to publish messages IoTHubTransport_MQTT_Common_ProcessItem shall return IOTHUB_PROCESS_NOT_CONNECTED. ] */
3062 result = IOTHUB_PROCESS_NOT_CONNECTED;
3063 }
3064 }
3065 return result;
3066}
3067
3068/* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ IoTHubTransport_MQTT_Common_DoWork shall subscribe to the Notification and get_state Topics if they are defined. ] */
3069void IoTHubTransport_MQTT_Common_DoWork(TRANSPORT_LL_HANDLE handle)
3070{
3071 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_026: [IoTHubTransport_MQTT_Common_DoWork shall do nothing if parameter handle and/or iotHubClientHandle is NULL.] */
3072 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3073 if (transport_data != NULL)
3074 {
3075 if (InitializeConnection(transport_data) == 0)
3076 {
3077 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_PENDING_CLOSE)
3078 {
3079 mqtt_client_disconnect(transport_data->mqttClient, NULL, NULL);
3080 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_NOT_CONNECTED;
3081 }
3082 else if (transport_data->currPacketState == CONNACK_TYPE || transport_data->currPacketState == SUBSCRIBE_TYPE)
3083 {
3084 SubscribeToMqttProtocol(transport_data);
3085 }
3086 else if (transport_data->currPacketState == SUBACK_TYPE)
3087 {
3088 if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) &&
3089 !transport_data->device_twin_get_sent)
3090 {
3091 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */
3092 MQTT_DEVICE_TWIN_ITEM* mqtt_info;
3093
3094 if ((mqtt_info = create_device_twin_get_message(transport_data)) == NULL)
3095 {
3096 LogError("Failure: could not create message for twin get command");
3097 }
3098 else if (publish_device_twin_get_message(transport_data, mqtt_info) == 0)
3099 {
3100 transport_data->device_twin_get_sent = true;
3101 }
3102 else
3103 {
3104 LogError("Failure: sending device twin get property command.");
3105 destroy_device_twin_get_message(mqtt_info);
3106 }
3107 }
3108 // Publish can be called now
3109 transport_data->currPacketState = PUBLISH_TYPE;
3110 }
3111 else if (transport_data->currPacketState == PUBLISH_TYPE)
3112 {
3113 PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink;
3114 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
3115 while (currentListEntry != transport_data->waitingToSend)
3116 {
3117 IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry);
3118 DLIST_ENTRY savedFromCurrentListEntry;
3119 savedFromCurrentListEntry.Flink = currentListEntry->Flink;
3120
3121 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
3122 size_t messageLength;
3123 const unsigned char* messagePayload = NULL;
3124 if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength))
3125 {
3126 (void)(DList_RemoveEntryList(currentListEntry));
3127 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
3128 LogError("Failure result from IoTHubMessage_GetData");
3129 }
3130 else
3131 {
3132 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */
3133 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST));
3134 if (mqttMsgEntry == NULL)
3135 {
3136 LogError("Allocation Error: Failure allocating MQTT Message Detail List.");
3137 }
3138 else
3139 {
3140 mqttMsgEntry->retryCount = 0;
3141 mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
3142 mqttMsgEntry->packet_id = get_next_packet_id(transport_data);
3143 if (publish_mqtt_telemetry_msg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0)
3144 {
3145 (void)(DList_RemoveEntryList(currentListEntry));
3146 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
3147 free(mqttMsgEntry);
3148 }
3149 else
3150 {
3151 // Remove the message from the waiting queue ...
3152 (void)(DList_RemoveEntryList(currentListEntry));
3153 // and add it to the ack queue
3154 DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry));
3155 }
3156 }
3157 }
3158 currentListEntry = savedFromCurrentListEntry.Flink;
3159 }
3160
3161 sendPendingGetTwinRequests(transport_data);
3162 }
3163 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */
3164 mqtt_client_dowork(transport_data->mqttClient);
3165 }
3166
3167 // Check the ack messages timeouts
3168 process_queued_ack_messages(transport_data);
3169 removeExpiredPendingGetTwinRequests(transport_data);
3170 removeExpiredGetTwinRequestsPendingAck(transport_data);
3171 }
3172}
3173
3174IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetSendStatus(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
3175{
3176 IOTHUB_CLIENT_RESULT result;
3177
3178 if (handle == NULL || iotHubClientStatus == NULL)
3179 {
3180 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */
3181 LogError("invalid arument.");
3182 result = IOTHUB_CLIENT_INVALID_ARG;
3183 }
3184 else
3185 {
3186 MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3187 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck)))
3188 {
3189 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */
3190 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
3191 }
3192 else
3193 {
3194 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */
3195 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
3196 }
3197 result = IOTHUB_CLIENT_OK;
3198 }
3199 return result;
3200}
3201
3202IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_SetOption(TRANSPORT_LL_HANDLE handle, const char* option, const void* value)
3203{
3204 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_021: [If any parameter is NULL then IoTHubTransport_MQTT_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG.] */
3205 IOTHUB_CLIENT_RESULT result;
3206 if (
3207 (handle == NULL) ||
3208 (option == NULL) ||
3209 (value == NULL)
3210 )
3211 {
3212 result = IOTHUB_CLIENT_INVALID_ARG;
3213 LogError("invalid parameter (NULL) passed to IoTHubTransport_MQTT_Common_SetOption.");
3214 }
3215 else
3216 {
3217 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3218
3219 IOTHUB_CREDENTIAL_TYPE cred_type = IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module);
3220
3221 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_031: [If the option parameter is set to "logtrace" then the value shall be a bool_ptr and the value will determine if the mqtt client log is on or off.] */
3222 if (strcmp(OPTION_LOG_TRACE, option) == 0)
3223 {
3224 transport_data->log_trace = *((bool*)value);
3225 mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace);
3226 result = IOTHUB_CLIENT_OK;
3227 }
3228 else if (strcmp("rawlogtrace", option) == 0)
3229 {
3230 transport_data->raw_trace = *((bool*)value);
3231 mqtt_client_set_trace(transport_data->mqttClient, transport_data->log_trace, transport_data->raw_trace);
3232 result = IOTHUB_CLIENT_OK;
3233 }
3234 else if (strcmp(OPTION_AUTO_URL_ENCODE_DECODE, option) == 0)
3235 {
3236 transport_data->auto_url_encode_decode = *((bool*)value);
3237 result = IOTHUB_CLIENT_OK;
3238 }
3239 else if (strcmp(OPTION_CONNECTION_TIMEOUT, option) == 0)
3240 {
3241 int* connection_time = (int*)value;
3242 if (*connection_time != transport_data->connect_timeout_in_sec)
3243 {
3244 transport_data->connect_timeout_in_sec = (uint16_t)(*connection_time);
3245 }
3246 result = IOTHUB_CLIENT_OK;
3247 }
3248 else if (strcmp(OPTION_KEEP_ALIVE, option) == 0)
3249 {
3250 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_036: [If the option parameter is set to "keepalive" then the value shall be a int_ptr and the value will determine the mqtt keepalive time that is set for pings.] */
3251 int* keepAliveOption = (int*)value;
3252 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_037 : [If the option parameter is set to supplied int_ptr keepalive is the same value as the existing keepalive then IoTHubTransport_MQTT_Common_SetOption shall do nothing.] */
3253 if (*keepAliveOption != transport_data->keepAliveValue)
3254 {
3255 transport_data->keepAliveValue = (uint16_t)(*keepAliveOption);
3256 if (transport_data->mqttClientStatus != MQTT_CLIENT_STATUS_NOT_CONNECTED)
3257 {
3258 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_038: [If the client is connected when the keepalive is set then IoTHubTransport_MQTT_Common_SetOption shall disconnect and reconnect with the specified keepalive value.] */
3259 DisconnectFromClient(transport_data);
3260 }
3261 }
3262 result = IOTHUB_CLIENT_OK;
3263 }
3264 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
3265 else if ((strcmp(OPTION_X509_CERT, option) == 0) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_UNKNOWN))
3266 {
3267 LogError("x509certificate specified, but authentication method is not x509");
3268 result = IOTHUB_CLIENT_INVALID_ARG;
3269 }
3270 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_040: [If the option parameter is set to "x509privatekey" then the value shall be a const char of the RSA Private Key to be used for x509.] */
3271 else if ((strcmp(OPTION_X509_PRIVATE_KEY, option) == 0) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_UNKNOWN))
3272 {
3273 LogError("x509privatekey specified, but authentication method is not x509");
3274 result = IOTHUB_CLIENT_INVALID_ARG;
3275 }
3276 else if (strcmp(OPTION_HTTP_PROXY, option) == 0)
3277 {
3278 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_001: [ If `option` is `proxy_data`, `value` shall be used as an `HTTP_PROXY_OPTIONS*`. ]*/
3279 HTTP_PROXY_OPTIONS* proxy_options = (HTTP_PROXY_OPTIONS*)value;
3280
3281 if (transport_data->xioTransport != NULL)
3282 {
3283 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_007: [ If the underlying IO has already been created, then `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3284 LogError("Cannot set proxy option once the underlying IO is created");
3285 result = IOTHUB_CLIENT_ERROR;
3286 }
3287 else if (proxy_options->host_address == NULL)
3288 {
3289 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_003: [ If `host_address` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
3290 LogError("NULL host_address in proxy options");
3291 result = IOTHUB_CLIENT_INVALID_ARG;
3292 }
3293 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_006: [ If only one of `username` and `password` is NULL, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_INVALID_ARG`. ]*/
3294 else if (((proxy_options->username == NULL) || (proxy_options->password == NULL)) &&
3295 (proxy_options->username != proxy_options->password))
3296 {
3297 LogError("Only one of username and password for proxy settings was NULL");
3298 result = IOTHUB_CLIENT_INVALID_ARG;
3299 }
3300 else
3301 {
3302 char* copied_proxy_hostname = NULL;
3303 char* copied_proxy_username = NULL;
3304 char* copied_proxy_password = NULL;
3305
3306 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_002: [ The fields `host_address`, `port`, `username` and `password` shall be saved for later used (needed when creating the underlying IO to be used by the transport). ]*/
3307 transport_data->http_proxy_port = proxy_options->port;
3308 if (mallocAndStrcpy_s(&copied_proxy_hostname, proxy_options->host_address) != 0)
3309 {
3310 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3311 LogError("Cannot copy HTTP proxy hostname");
3312 result = IOTHUB_CLIENT_ERROR;
3313 }
3314 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
3315 else if ((proxy_options->username != NULL) && (mallocAndStrcpy_s(&copied_proxy_username, proxy_options->username) != 0))
3316 {
3317 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3318 free(copied_proxy_hostname);
3319 LogError("Cannot copy HTTP proxy username");
3320 result = IOTHUB_CLIENT_ERROR;
3321 }
3322 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_005: [ `username` and `password` shall be allowed to be NULL. ]*/
3323 else if ((proxy_options->password != NULL) && (mallocAndStrcpy_s(&copied_proxy_password, proxy_options->password) != 0))
3324 {
3325 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_004: [ If copying `host_address`, `username` or `password` fails, `IoTHubTransport_MQTT_Common_SetOption` shall fail and return `IOTHUB_CLIENT_ERROR`. ]*/
3326 if (copied_proxy_username != NULL)
3327 {
3328 free(copied_proxy_username);
3329 }
3330 free(copied_proxy_hostname);
3331 LogError("Cannot copy HTTP proxy password");
3332 result = IOTHUB_CLIENT_ERROR;
3333 }
3334 else
3335 {
3336 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/
3337 free_proxy_data(transport_data);
3338
3339 transport_data->http_proxy_hostname = copied_proxy_hostname;
3340 transport_data->http_proxy_username = copied_proxy_username;
3341 transport_data->http_proxy_password = copied_proxy_password;
3342
3343 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_008: [ If setting the `proxy_data` option suceeds, `IoTHubTransport_MQTT_Common_SetOption` shall return `IOTHUB_CLIENT_OK` ]*/
3344 result = IOTHUB_CLIENT_OK;
3345 }
3346 }
3347 }
3348 else
3349 {
3350 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_039: [If the option parameter is set to "x509certificate" then the value shall be a const char of the certificate to be used for x509.] */
3351 if (((strcmp(OPTION_X509_CERT, option) == 0) || (strcmp(OPTION_X509_PRIVATE_KEY, option) == 0)) && (cred_type != IOTHUB_CREDENTIAL_TYPE_X509))
3352 {
3353 IoTHubClient_Auth_Set_x509_Type(transport_data->authorization_module, true);
3354 }
3355
3356 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */
3357 if (GetTransportProviderIfNecessary(transport_data) == 0)
3358 {
3359 if (xio_setoption(transport_data->xioTransport, option, value) == 0)
3360 {
3361 result = IOTHUB_CLIENT_OK;
3362 }
3363 else
3364 {
3365 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_132: [IoTHubTransport_MQTT_Common_SetOption shall return IOTHUB_CLIENT_INVALID_ARG xio_setoption fails] */
3366 result = IOTHUB_CLIENT_INVALID_ARG;
3367 }
3368 }
3369 else
3370 {
3371 result = IOTHUB_CLIENT_ERROR;
3372 }
3373 }
3374 }
3375 return result;
3376}
3377
3378static bool check_module_ids_equal(const char* transportModuleId, const char* deviceModuleId)
3379{
3380 if ((transportModuleId != NULL) && (deviceModuleId == NULL))
3381 {
3382 return false;
3383 }
3384 else if ((transportModuleId == NULL) && (deviceModuleId != NULL))
3385 {
3386 return false;
3387 }
3388 else if ((transportModuleId == NULL) && (deviceModuleId == NULL))
3389 {
3390 return true;
3391 }
3392 else
3393 {
3394 return (0 == strcmp(transportModuleId, deviceModuleId));
3395 }
3396}
3397
3398TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Register(TRANSPORT_LL_HANDLE handle, const IOTHUB_DEVICE_CONFIG* device, PDLIST_ENTRY waitingToSend)
3399{
3400 TRANSPORT_LL_HANDLE result = NULL;
3401
3402 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if the TRANSPORT_LL_HANDLE is NULL.]
3403 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_002: [ IoTHubTransport_MQTT_Common_Register shall return NULL if device or waitingToSend are NULL.]
3404 if ((handle == NULL) || (device == NULL) || (waitingToSend == NULL))
3405 {
3406 LogError("IoTHubTransport_MQTT_Common_Register: handle, device or waitingToSend is NULL.");
3407 result = NULL;
3408 }
3409 else
3410 {
3411 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3412
3413 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceId, or both deviceKey and deviceSasToken are NULL.]
3414 if (device->deviceId == NULL)
3415 {
3416 LogError("IoTHubTransport_MQTT_Common_Register: deviceId is NULL.");
3417 result = NULL;
3418 }
3419 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_03_002: [ IoTHubTransport_MQTT_Common_Register shall return NULL if both deviceKey and deviceSasToken are provided.]
3420 else if ((device->deviceKey != NULL) && (device->deviceSasToken != NULL))
3421 {
3422 LogError("IoTHubTransport_MQTT_Common_Register: Both deviceKey and deviceSasToken are defined. Only one can be used.");
3423 result = NULL;
3424 }
3425 else
3426 {
3427 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_003: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceId or deviceKey do not match the deviceId and deviceKey passed in during IoTHubTransport_MQTT_Common_Create.]
3428 if (strcmp(STRING_c_str(transport_data->device_id), device->deviceId) != 0)
3429 {
3430 LogError("IoTHubTransport_MQTT_Common_Register: deviceId does not match.");
3431 result = NULL;
3432 }
3433 else if (!check_module_ids_equal(STRING_c_str(transport_data->module_id), device->moduleId))
3434 {
3435 LogError("IoTHubTransport_MQTT_Common_Register: moduleId does not match.");
3436 result = NULL;
3437 }
3438 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_43_001: [ IoTHubTransport_MQTT_Common_Register shall return NULL if deviceKey is NULL when credential type is IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY ]
3439 else if (IoTHubClient_Auth_Get_Credential_Type(transport_data->authorization_module) == IOTHUB_CREDENTIAL_TYPE_DEVICE_KEY &&
3440 ((device->deviceKey == NULL) || (strcmp(IoTHubClient_Auth_Get_DeviceKey(transport_data->authorization_module), device->deviceKey) != 0)))
3441 {
3442 LogError("IoTHubTransport_MQTT_Common_Register: deviceKey does not match.");
3443 result = NULL;
3444 }
3445 else
3446 {
3447 if (transport_data->isRegistered == true)
3448 {
3449 LogError("Transport already has device registered by id: [%s]", device->deviceId);
3450 result = NULL;
3451 }
3452 else
3453 {
3454 transport_data->isRegistered = true;
3455 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_004: [ IoTHubTransport_MQTT_Common_Register shall return the TRANSPORT_LL_HANDLE as the TRANSPORT_LL_HANDLE. ]
3456 result = (TRANSPORT_LL_HANDLE)handle;
3457 }
3458 }
3459 }
3460 }
3461
3462 return result;
3463}
3464
3465void IoTHubTransport_MQTT_Common_Unregister(TRANSPORT_LL_HANDLE deviceHandle)
3466{
3467 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_17_005: [ If deviceHandle is NULL `IoTHubTransport_MQTT_Common_Unregister` shall do nothing. ]
3468 if (deviceHandle != NULL)
3469 {
3470 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)deviceHandle;
3471
3472 transport_data->isRegistered = false;
3473 }
3474}
3475
3476STRING_HANDLE IoTHubTransport_MQTT_Common_GetHostname(TRANSPORT_LL_HANDLE handle)
3477{
3478 STRING_HANDLE result;
3479 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_02_001: [ If handle is NULL then IoTHubTransport_MQTT_Common_GetHostname shall fail and return NULL. ]*/
3480 if (handle == NULL)
3481 {
3482 result = NULL;
3483 }
3484 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_02_002: [ Otherwise IoTHubTransport_MQTT_Common_GetHostname shall return a non-NULL STRING_HANDLE containg the hostname. ]*/
3485 else if ((result = STRING_clone(((MQTTTRANSPORT_HANDLE_DATA*)(handle))->hostAddress)) == NULL)
3486 {
3487 LogError("Cannot provide the target host name (STRING_clone failed).");
3488 }
3489
3490 return result;
3491}
3492
3493IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_SendMessageDisposition(MESSAGE_CALLBACK_INFO* message_data, IOTHUBMESSAGE_DISPOSITION_RESULT disposition)
3494{
3495 (void)disposition;
3496
3497 IOTHUB_CLIENT_RESULT result;
3498 if (message_data)
3499 {
3500 if (message_data->messageHandle)
3501 {
3502 IoTHubMessage_Destroy(message_data->messageHandle);
3503 result = IOTHUB_CLIENT_OK;
3504 }
3505 else
3506 {
3507 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_10_002: [If any of the messageData fields are NULL, IoTHubTransport_MQTT_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR. ]*/
3508 LogError("message handle is NULL");
3509 result = IOTHUB_CLIENT_ERROR;
3510 }
3511 free(message_data);
3512 }
3513 else
3514 {
3515 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_10_001: [If messageData is NULL, IoTHubTransport_MQTT_Common_SendMessageDisposition shall fail and return IOTHUB_CLIENT_ERROR. ]*/
3516 LogError("message_data is NULL");
3517 result = IOTHUB_CLIENT_ERROR;
3518 }
3519 return result;
3520}
3521
3522
3523int IoTHubTransport_MQTT_Common_Subscribe_InputQueue(TRANSPORT_LL_HANDLE handle)
3524{
3525 int result;
3526 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3527 if (transport_data == NULL)
3528 {
3529 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_066: [ If parameter handle is NULL than IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return a non-zero value.]
3530 LogError("Invalid handle parameter. NULL.");
3531 result = MU_FAILURE;
3532 }
3533 else if (transport_data->module_id == NULL)
3534 {
3535 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_073: [ If module ID is not set on the transpont, IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall fail.]
3536 LogError("ModuleID must be specified for input queues. NULL.");
3537 result = MU_FAILURE;
3538 }
3539 else if ((transport_data->topic_InputQueue == NULL) &&
3540 (transport_data->topic_InputQueue = STRING_construct_sprintf(TOPIC_INPUT_QUEUE_NAME, STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id))) == NULL)
3541 {
3542 LogError("Failure constructing Message Topic");
3543 result = MU_FAILURE;
3544 }
3545 else
3546 {
3547 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_067: [ IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the input queue Message Topic.]
3548 transport_data->topics_ToSubscribe |= SUBSCRIBE_INPUT_QUEUE_TOPIC;
3549 changeStateToSubscribeIfAllowed(transport_data);
3550 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_070: [ On success IoTHubTransport_MQTT_Common_Subscribe_InputQueue shall return 0.]
3551 result = 0;
3552 }
3553 return result;
3554}
3555
3556void IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue(TRANSPORT_LL_HANDLE handle)
3557{
3558 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
3559 if ((transport_data != NULL) && (transport_data->topic_InputQueue != NULL))
3560 {
3561 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_072: [ IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall call mqtt_client_unsubscribe to unsubscribe the mqtt input queue message topic.]
3562 const char* unsubscribe[1];
3563 unsubscribe[0] = STRING_c_str(transport_data->topic_InputQueue);
3564 if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
3565 {
3566 LogError("Failure calling mqtt_client_unsubscribe");
3567 }
3568 STRING_delete(transport_data->topic_InputQueue);
3569 transport_data->topic_InputQueue = NULL;
3570 transport_data->topics_ToSubscribe &= ~SUBSCRIBE_INPUT_QUEUE_TOPIC;
3571 }
3572 else
3573 {
3574 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_071: [ If parameter handle is NULL then IoTHubTransport_MQTT_Common_Unsubscribe_InputQueue shall do nothing.]
3575 LogError("Invalid argument to unsubscribe input queue (NULL).");
3576 }
3577}
3578
3579int IoTHubTransport_MQTT_SetCallbackContext(TRANSPORT_LL_HANDLE handle, void* ctx)
3580{
3581 int result;
3582 if (handle == NULL)
3583 {
3584 LogError("Invalid parameter specified handle: %p", handle);
3585 result = MU_FAILURE;
3586 }
3587 else
3588 {
3589 MQTTTRANSPORT_HANDLE_DATA* transport_data = (MQTTTRANSPORT_HANDLE_DATA*)handle;
3590 transport_data->transport_ctx = ctx;
3591 result = 0;
3592 }
3593 return result;
3594}
Note: See TracBrowser for help on using the repository browser.