Changeset 464 for azure_iot_hub_f767zi/trunk/azure_iot_sdk/iothub_client/src/iothubtransport_mqtt_common.c
- Timestamp:
- Jun 22, 2021, 9:00:19 PM (3 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
azure_iot_hub_f767zi/trunk/azure_iot_sdk/iothub_client/src/iothubtransport_mqtt_common.c
r457 r464 82 82 83 83 static const char* MESSAGE_ID_PROPERTY = "mid"; 84 static const char* MESSAGE_CREATION_TIME_UTC = "ctime"; 85 static const char* MESSAGE_USER_ID = "uid"; 84 86 static const char* CORRELATION_ID_PROPERTY = "cid"; 85 87 static const char* CONTENT_TYPE_PROPERTY = "ct"; … … 200 202 MQTT_CLIENT_STATUS mqttClientStatus; 201 203 bool isDestroyCalled; 202 bool isRetryExpiredCallback Set;204 bool isRetryExpiredCallbackCalled; 203 205 bool device_twin_get_sent; 204 206 bool twin_resp_sub_recv; … … 274 276 } DEVICE_METHOD_INFO; 275 277 276 static void free_proxy_data(MQTTTRANSPORT_HANDLE_DATA* mqtt_transport_instance) 277 { 278 if (mqtt_transport_instance->http_proxy_hostname != NULL) 279 { 280 free(mqtt_transport_instance->http_proxy_hostname); 281 mqtt_transport_instance->http_proxy_hostname = NULL; 282 } 283 284 if (mqtt_transport_instance->http_proxy_username != NULL) 285 { 286 free(mqtt_transport_instance->http_proxy_username); 287 mqtt_transport_instance->http_proxy_username = NULL; 288 } 289 290 if (mqtt_transport_instance->http_proxy_password != NULL) 291 { 292 free(mqtt_transport_instance->http_proxy_password); 293 mqtt_transport_instance->http_proxy_password = NULL; 294 } 295 } 296 297 // Destroys xio transport associated with MQTT handle and resets appropriate state 278 // 279 // InternStrnicmp implements strnicmp. strnicmp isn't available on all platforms. 280 // 281 static int InternStrnicmp(const char* s1, const char* s2, size_t n) 282 { 283 int result; 284 285 if (s1 == NULL) 286 { 287 result = -1; 288 } 289 else if (s2 == NULL) 290 { 291 result = 1; 292 } 293 else 294 { 295 result = 0; 296 297 while (n-- && result == 0) 298 { 299 if (*s1 == 0) result = -1; 300 else if (*s2 == 0) result = 1; 301 else 302 { 303 304 result = TOLOWER(*s1) - TOLOWER(*s2); 305 ++s1; 306 ++s2; 307 } 308 } 309 } 310 return result; 311 } 312 313 // 314 // freeProxyData free()'s and resets proxy related settings of the mqtt_transport_instance. 315 // 316 static void freeProxyData(MQTTTRANSPORT_HANDLE_DATA* transport_data) 317 { 318 if (transport_data->http_proxy_hostname != NULL) 319 { 320 free(transport_data->http_proxy_hostname); 321 transport_data->http_proxy_hostname = NULL; 322 } 323 324 if (transport_data->http_proxy_username != NULL) 325 { 326 free(transport_data->http_proxy_username); 327 transport_data->http_proxy_username = NULL; 328 } 329 330 if (transport_data->http_proxy_password != NULL) 331 { 332 free(transport_data->http_proxy_password); 333 transport_data->http_proxy_password = NULL; 334 } 335 } 336 337 // 338 // DestroyXioTransport frees resources associated with MQTT handle and resets appropriate state 339 // 298 340 static void DestroyXioTransport(PMQTTTRANSPORT_HANDLE_DATA transport_data) 299 341 { 342 mqtt_client_clear_xio(transport_data->mqttClient); 300 343 xio_destroy(transport_data->xioTransport); 301 344 transport_data->xioTransport = NULL; 302 345 } 303 346 304 static void set_saved_tls_options(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options) 347 // 348 // setSavedTlsOptions saves off TLS specific options. This is used 349 // so that during a disconnection, we have these values available for next reconnection. 350 // 351 static void setSavedTlsOptions(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options) 305 352 { 306 353 if (transport->saved_tls_options != NULL) … … 311 358 } 312 359 313 static void free_transport_handle_data(MQTTTRANSPORT_HANDLE_DATA* transport_data) 360 // 361 // freeTransportHandleData free()'s 'the transport_data and all members that were allocated by it. 362 // 363 static void freeTransportHandleData(MQTTTRANSPORT_HANDLE_DATA* transport_data) 314 364 { 315 365 if (transport_data->mqttClient != NULL) 316 366 { 317 367 mqtt_client_deinit(transport_data->mqttClient); 368 transport_data->mqttClient = NULL; 318 369 } 319 370 … … 323 374 } 324 375 325 set _saved_tls_options(transport_data, NULL);376 setSavedTlsOptions(transport_data, NULL); 326 377 327 378 tickcounter_destroy(transport_data->msgTickCounter); 328 379 329 free _proxy_data(transport_data);380 freeProxyData(transport_data); 330 381 331 382 STRING_delete(transport_data->devicesAndModulesPath); … … 346 397 } 347 398 348 int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds) 349 { 350 int result; 351 352 if (handle == NULL) 353 { 354 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */ 355 LogError("Invalid handle parameter. NULL."); 356 result = MU_FAILURE; 357 } 358 else 359 { 360 RETRY_CONTROL_HANDLE new_retry_control_handle; 361 362 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters] 363 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ] 364 if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL) 365 { 366 LogError("Failed creating new retry control handle"); 367 result = MU_FAILURE; 368 } 369 else 370 { 371 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle; 372 RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle; 373 374 transport_data->retry_control_handle = new_retry_control_handle; 375 retry_control_destroy(previous_retry_control_handle); 376 377 /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/ 378 result = 0; 379 } 380 } 381 382 return result; 383 } 384 385 static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transport_data) 399 // 400 // getNextPacketId gets the next Packet Id to use and increments internal counter. 401 // 402 static uint16_t getNextPacketId(PMQTTTRANSPORT_HANDLE_DATA transport_data) 386 403 { 387 404 if (transport_data->packetId + 1 >= USHRT_MAX) … … 397 414 398 415 #ifndef NO_LOGGING 399 static const char* retrieve_mqtt_return_codes(CONNECT_RETURN_CODE rtn_code) 416 // 417 // retrieveMqttReturnCodes returns friendly representation of connection code for logging purposes. 418 // 419 static const char* retrieveMqttReturnCodes(CONNECT_RETURN_CODE rtn_code) 400 420 { 401 421 switch (rtn_code) … … 420 440 #endif // NO_LOGGING 421 441 422 static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id) 442 // 443 // retrievDeviceMethodRidInfo parses an incoming MQTT topic for a device method and retrieves the request ID it specifies. 444 // 445 static int retrievDeviceMethodRidInfo(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id) 423 446 { 424 447 int result; … … 484 507 } 485 508 486 static int parse_device_twin_topic_info(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code) 509 // 510 // parseDeviceTwinTopicInfo parses information about a topic PUBLISH'd to this device/module. 511 // 512 static int parseDeviceTwinTopicInfo(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code) 487 513 { 488 514 int result; … … 548 574 } 549 575 550 static int InternStrnicmp(const char* s1, const char* s2, size_t n) 551 { 552 int result; 553 554 if (s1 == NULL) result = -1; 555 else if (s2 == NULL) result = 1; 556 else 557 { 558 result = 0; 559 560 while (n-- && result == 0) 561 { 562 if (*s1 == 0) result = -1; 563 else if (*s2 == 0) result = 1; 564 else 565 { 566 567 result = TOLOWER(*s1) - TOLOWER(*s2); 568 ++s1; 569 ++s2; 570 } 571 } 572 } 573 return result; 574 } 575 576 static IOTHUB_IDENTITY_TYPE retrieve_topic_type(const char* topic_resp, const char* input_queue) 576 // 577 // retrieveTopicType translates an MQTT topic PUBLISH'd to this device/module into what type (e.g. twin, method, etc.) it represents. 578 // 579 static IOTHUB_IDENTITY_TYPE retrieveTopicType(const char* topic_resp, const char* input_queue) 577 580 { 578 581 IOTHUB_IDENTITY_TYPE type; … … 598 601 } 599 602 600 static void sendMsgComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult) 603 // 604 // notifyApplicationOfSendMessageComplete lets application know that messages in the iothubMsgList have completed (or should be considered failed) with confirmResult status. 605 // 606 static void notifyApplicationOfSendMessageComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult) 601 607 { 602 608 DLIST_ENTRY messageCompleted; … … 606 612 } 607 613 614 // 615 // addUserPropertiesTouMqttMessage translates application properties in iothub_message_handle (set by the application with IoTHubMessage_SetProperty e.g.) 616 // into a representation in the MQTT TOPIC topic_string. 617 // 608 618 static int addUserPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode) 609 619 { … … 660 670 } 661 671 672 // 673 // addSystemPropertyToTopicString appends a given "system" property from iothub_message_handle (set by the application with APIs such as IoTHubMessage_SetMessageId, 674 // IoTHubMessage_SetContentTypeSystemProperty, etc.) onto the MQTT TOPIC topic_string. 675 // 662 676 static int addSystemPropertyToTopicString(STRING_HANDLE topic_string, size_t index, const char* property_key, const char* property_value, bool urlencode) 663 677 { … … 690 704 } 691 705 706 // 707 // addSystemPropertyToTopicString appends all "system" property from iothub_message_handle (set by the application with APIs such as IoTHubMessage_SetMessageId, 708 // IoTHubMessage_SetContentTypeSystemProperty, etc.) onto the MQTT TOPIC topic_string. 709 // 692 710 static int addSystemPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode) 693 711 { … … 734 752 } 735 753 } 754 755 if (result == 0) 756 { 757 const char* message_creation_time_utc = IoTHubMessage_GetMessageCreationTimeUtcSystemProperty(iothub_message_handle); 758 if (message_creation_time_utc != NULL) 759 { 760 result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_CREATION_TIME_UTC, message_creation_time_utc, urlencode); 761 index++; 762 } 763 } 764 736 765 if (result == 0) 737 766 { … … 754 783 } 755 784 785 // 786 // addDiagnosticPropertiesTouMqttMessage appends diagnostic data (as specified by IoTHubMessage_SetDiagnosticPropertyData) onto 787 // the MQTT topic topic_string. 788 // 756 789 static int addDiagnosticPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr) 757 790 { … … 821 854 } 822 855 823 856 // 857 // addPropertiesTouMqttMessage adds user, "system", and diagnostic messages onto MQTT topic string. Note that "system" properties is a 858 // construct of the SDK and IoT Hub. The MQTT protocol itself does not assign any significance to system and user properties (as opposed to AMQP). 859 // The IOTHUB_MESSAGE_HANDLE structure however does have well-known properties (e.g. IoTHubMessage_SetMessageId) that the SDK treats as system 860 // properties where we can automatically fill in the key value for in the key=value list. 861 // 824 862 static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic, bool urlencode) 825 863 { … … 868 906 } 869 907 870 static int publish_mqtt_telemetry_msg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len) 908 // 909 // publishTelemetryMsg invokes the umqtt layer to send a PUBLISH message. 910 // 911 static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len) 871 912 { 872 913 int result; … … 912 953 } 913 954 914 static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size) 955 // 956 // publishDeviceMethodResponseMsg invokes the umqtt to send a PUBLISH message that contains device method call results. 957 // 958 static int publishDeviceMethodResponseMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size) 915 959 { 916 960 int result; 917 uint16_t packet_id = get _next_packet_id(transport_data);961 uint16_t packet_id = getNextPacketId(transport_data); 918 962 919 963 STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id)); … … 949 993 } 950 994 951 952 static void destroy_device_twin_get_message(MQTT_DEVICE_TWIN_ITEM* msg_entry) 995 // 996 // destroyDeviceTwinGetMsg frees msg_entry and any data associated with it. 997 // 998 static void destroyDeviceTwinGetMsg(MQTT_DEVICE_TWIN_ITEM* msg_entry) 953 999 { 954 1000 free(msg_entry); 955 1001 } 956 1002 957 static MQTT_DEVICE_TWIN_ITEM* create_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id) 1003 // 1004 // createDeviceTwinMsg allocates and fills in structure for MQTT_DEVICE_TWIN_ITEM. 1005 // 1006 static MQTT_DEVICE_TWIN_ITEM* createDeviceTwinMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id) 958 1007 { 959 1008 MQTT_DEVICE_TWIN_ITEM* result; … … 974 1023 memset(result, 0, sizeof(*result)); 975 1024 result->msgCreationTime = current_time; 976 result->packet_id = get _next_packet_id(transport_data);1025 result->packet_id = getNextPacketId(transport_data); 977 1026 result->iothub_msg_id = iothub_msg_id; 978 1027 result->device_twin_msg_type = device_twin_msg_type; … … 982 1031 } 983 1032 984 static int publish_device_twin_get_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info) 1033 // 1034 // publishDeviceTwinGetMsg invokes umqtt to PUBLISH a request to get the twin information. 1035 // 1036 static int publishDeviceTwinGetMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info) 985 1037 { 986 1038 int result; … … 1020 1072 } 1021 1073 1074 // 1075 // sendPendingGetTwinRequests will send any queued up GetTwin requests during a DoWork loop. 1076 // 1022 1077 static void sendPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transportData) 1023 1078 { … … 1031 1086 (void)DList_RemoveEntryList(dev_twin_item); 1032 1087 1033 if (publish _device_twin_get_message(transportData, msg_entry) != 0)1088 if (publishDeviceTwinGetMsg(transportData, msg_entry) != 0) 1034 1089 { 1035 1090 LogError("Failed sending pending get twin request"); 1036 destroy _device_twin_get_message(msg_entry);1091 destroyDeviceTwinGetMsg(msg_entry); 1037 1092 } 1038 1093 else … … 1045 1100 } 1046 1101 1047 1102 // 1103 // removeExpiredTwinRequestsFromList removes any requests that have timed out. 1104 // 1048 1105 static void removeExpiredTwinRequestsFromList(PMQTTTRANSPORT_HANDLE_DATA transport_data, tickcounter_ms_t current_ms, DLIST_ENTRY* twin_list) 1049 1106 { 1050 1107 PDLIST_ENTRY list_item = twin_list->Flink; 1051 1108 1052 1109 while (list_item != twin_list) 1053 1110 { … … 1076 1133 { 1077 1134 (void)DList_RemoveEntryList(list_item); 1078 destroy _device_twin_get_message(msg_entry);1135 destroyDeviceTwinGetMsg(msg_entry); 1079 1136 } 1080 1137 … … 1084 1141 } 1085 1142 1143 // 1144 // removeExpiredTwinRequests removes any requests that have timed out, regardless of how the request invoked. 1145 // 1086 1146 static void removeExpiredTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data) 1087 1147 { … … 1095 1155 } 1096 1156 1097 static int publish_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info) 1157 // 1158 // publishDeviceTwinMsg invokes umqtt to PUBLISH a request for the twin. 1159 // 1160 static int publishDeviceTwinMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info) 1098 1161 { 1099 1162 int result; … … 1149 1212 } 1150 1213 1214 // 1215 // changeStateToSubscribeIfAllowed attempts to transition the state machine to subscribe, if our 1216 // current state will allow it. 1217 // This function does NOT immediately send the SUBSCRIBE however, instead setting things up 1218 // so the next time DoWork is invoked then the SUBSCRIBE will happen. 1219 // 1151 1220 static void changeStateToSubscribeIfAllowed(PMQTTTRANSPORT_HANDLE_DATA transport_data) 1152 1221 { … … 1160 1229 } 1161 1230 1231 // 1232 // subscribeToNotifyStateIfNeeded sets up to subscribe to the server. 1233 // 1162 1234 static int subscribeToNotifyStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data) 1163 1235 { … … 1191 1263 } 1192 1264 1193 1265 // 1266 // isSystemProperty returns whether a given property name in an MQTT TOPIC published to this device/module 1267 // is considered a "system". MQTT does not have a protocol defined concept of system properties. In this usage 1268 // it implies that the IOTHUB_MESSAGE_HANDLE has an API for direct manipulation of the property (e.g. IoTHubMessage_GetMessageId). 1269 // 1194 1270 static bool isSystemProperty(const char* tokenData) 1195 1271 { … … 1197 1273 size_t propCount = sizeof(sysPropList) / sizeof(sysPropList[0]); 1198 1274 size_t index = 0; 1275 size_t tokenDataLength = strlen(tokenData); 1276 1199 1277 for (index = 0; index < propCount; index++) 1200 1278 { 1201 if (memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0) 1279 if (tokenDataLength >= sysPropList[index].propLength && 1280 memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0) 1202 1281 { 1203 1282 result = true; … … 1208 1287 } 1209 1288 1210 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ] 1211 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ] 1212 static int addInputNamePropertyToMessage(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name) 1213 { 1289 // 1290 // addInputNamePropertyToMsg translates the input name (embedded in the MQTT topic name) into the IoTHubMessage handle 1291 // such that the application can call IoTHubMessage_GetInputName() to retrieve this. This is only currently used 1292 // in IoT Edge module to module message communication, so that this module receiving the message can know which module invoked in. 1293 // 1294 static int addInputNamePropertyToMsg(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name) 1295 { 1296 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ] 1297 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ] 1298 1214 1299 int result = MU_FAILURE; 1215 1300 int number_tokens_read = 0; … … 1261 1346 } 1262 1347 1348 // 1349 // setMqttMessagePropertyIfPossible attempts to translate a "system" property into the IOTHUB_MESSAGE_HANDLE that will be provided to the 1350 // application's callback. 1351 // 1263 1352 static int setMqttMessagePropertyIfPossible(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* propName, const char* propValue, size_t nameLen) 1264 1353 { … … 1266 1355 int result = 0; 1267 1356 1357 if (nameLen > 5) 1358 { 1359 if (strcmp((const char*)&propName[nameLen - 5], MESSAGE_CREATION_TIME_UTC) == 0) 1360 { 1361 if (IoTHubMessage_SetMessageCreationTimeUtcSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK) 1362 { 1363 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'CreationTimeUtc' property."); 1364 result = MU_FAILURE; 1365 } 1366 return result; 1367 } 1368 } 1369 1268 1370 if (nameLen > 4) 1269 1371 { … … 1309 1411 return result; 1310 1412 } 1413 else if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_USER_ID) == 0) 1414 { 1415 if (IoTHubMessage_SetMessageUserIdSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK) 1416 { 1417 LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'userId' property."); 1418 result = MU_FAILURE; 1419 } 1420 return result; 1421 } 1311 1422 } 1312 1423 … … 1338 1449 } 1339 1450 1451 // 1452 // extractMqttProperties parses the MQTT topic PUBLISH'd to this device/module, retrieves properties and fills out the 1453 // IOTHUB_MESSAGE_HANDLE which will ultimately be delivered to the application callback. 1454 // 1340 1455 static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name, bool urldecode) 1341 1456 { … … 1503 1618 } 1504 1619 1505 static void mqtt_notification_callback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx) 1506 { 1507 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqtt_notification_callback shall do nothing. ] */ 1508 if (msgHandle != NULL && callbackCtx != NULL) 1509 { 1510 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqtt_notification_callback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */ 1511 const char* topic_resp = mqttmessage_getTopicName(msgHandle); 1512 if (topic_resp == NULL) 1513 { 1514 LogError("Failure: NULL topic name encountered"); 1620 // 1621 // processTwinNotification processes device and module twin updates made by IoT Hub / IoT Edge. 1622 // 1623 static void processTwinNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp) 1624 { 1625 size_t request_id; 1626 int status_code; 1627 bool notification_msg; 1628 1629 if (parseDeviceTwinTopicInfo(topic_resp, ¬ification_msg, &request_id, &status_code) != 0) 1630 { 1631 LogError("Failure: parsing device topic info"); 1632 } 1633 else 1634 { 1635 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); 1636 if (notification_msg) 1637 { 1638 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx); 1515 1639 } 1516 1640 else 1517 1641 { 1518 PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx; 1519 1520 IOTHUB_IDENTITY_TYPE type = retrieve_topic_type(topic_resp, STRING_c_str(transportData->topic_InputQueue)); 1521 if (type == IOTHUB_TYPE_DEVICE_TWIN) 1522 { 1523 size_t request_id; 1524 int status_code; 1525 bool notification_msg; 1526 if (parse_device_twin_topic_info(topic_resp, ¬ification_msg, &request_id, &status_code) != 0) 1527 { 1528 LogError("Failure: parsing device topic info"); 1529 } 1530 else 1531 { 1532 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); 1533 if (notification_msg) 1642 PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink; 1643 while (dev_twin_item != &transportData->ack_waiting_queue) 1644 { 1645 DLIST_ENTRY saveListEntry; 1646 saveListEntry.Flink = dev_twin_item->Flink; 1647 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry); 1648 if (request_id == msg_entry->packet_id) 1649 { 1650 (void)DList_RemoveEntryList(dev_twin_item); 1651 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES) 1534 1652 { 1535 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx); 1653 if (msg_entry->userCallback == NULL) 1654 { 1655 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqttNotificationCallback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */ 1656 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx); 1657 // Only after receiving device twin request should we start listening for patches. 1658 (void)subscribeToNotifyStateIfNeeded(transportData); 1659 } 1660 else 1661 { 1662 // This is a on-demand get twin request. 1663 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext); 1664 } 1536 1665 } 1537 1666 else 1538 1667 { 1539 PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink; 1540 while (dev_twin_item != &transportData->ack_waiting_queue) 1541 { 1542 DLIST_ENTRY saveListEntry; 1543 saveListEntry.Flink = dev_twin_item->Flink; 1544 MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry); 1545 if (request_id == msg_entry->packet_id) 1546 { 1547 (void)DList_RemoveEntryList(dev_twin_item); 1548 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES) 1549 { 1550 if (msg_entry->userCallback == NULL) 1551 { 1552 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */ 1553 transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx); 1554 // Only after receiving device twin request should we start listening for patches. 1555 (void)subscribeToNotifyStateIfNeeded(transportData); 1556 } 1557 else 1558 { 1559 // This is a on-demand get twin request. 1560 msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext); 1561 } 1562 } 1563 else 1564 { 1565 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_ReportedStateComplete ] */ 1566 transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx); 1567 // Only after receiving device twin request should we start listening for patches. 1568 (void)subscribeToNotifyStateIfNeeded(transportData); 1569 } 1570 1571 destroy_device_twin_get_message(msg_entry); 1572 break; 1573 } 1574 dev_twin_item = saveListEntry.Flink; 1575 } 1668 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqttNotificationCallback shall call IoTHubClientCore_LL_ReportedStateComplete ] */ 1669 transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx); 1670 // Only after receiving device twin request should we start listening for patches. 1671 (void)subscribeToNotifyStateIfNeeded(transportData); 1576 1672 } 1577 } 1673 1674 destroyDeviceTwinGetMsg(msg_entry); 1675 break; 1676 } 1677 dev_twin_item = saveListEntry.Flink; 1678 } 1679 } 1680 } 1681 } 1682 1683 // 1684 // processDeviceMethodNotification processes a device and module method invocations made by IoT Hub / IoT Edge. 1685 // 1686 static void processDeviceMethodNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp) 1687 { 1688 STRING_HANDLE method_name = STRING_new(); 1689 if (method_name == NULL) 1690 { 1691 LogError("Failure: allocating method_name string value"); 1692 } 1693 else 1694 { 1695 DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO)); 1696 if (dev_method_info == NULL) 1697 { 1698 LogError("Failure: allocating DEVICE_METHOD_INFO object"); 1699 } 1700 else 1701 { 1702 dev_method_info->request_id = STRING_new(); 1703 if (dev_method_info->request_id == NULL) 1704 { 1705 LogError("Failure constructing request_id string"); 1706 free(dev_method_info); 1707 } 1708 else if (retrievDeviceMethodRidInfo(topic_resp, method_name, dev_method_info->request_id) != 0) 1709 { 1710 LogError("Failure: retrieve device topic info"); 1711 STRING_delete(dev_method_info->request_id); 1712 free(dev_method_info); 1713 } 1714 else 1715 { 1716 /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqttNotificationCallback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */ 1717 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); 1718 if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0) 1719 { 1720 LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete"); 1721 STRING_delete(dev_method_info->request_id); 1722 free(dev_method_info); 1723 } 1724 } 1725 } 1726 STRING_delete(method_name); 1727 } 1728 } 1729 1730 // 1731 // processIncomingMessageNotification processes both C2D messages and messages sent from one IoT Edge module into this module 1732 // 1733 static void processIncomingMessageNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp, IOTHUB_IDENTITY_TYPE type) 1734 { 1735 const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle); 1736 IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length); 1737 if (IoTHubMessage == NULL) 1738 { 1739 LogError("Failure: IotHub Message creation has failed."); 1740 } 1741 else if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMsg(IoTHubMessage, topic_resp) != 0)) 1742 { 1743 LogError("failure adding input name to property."); 1744 } 1745 else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0) 1746 { 1747 LogError("failure extracting mqtt properties."); 1748 } 1749 else 1750 { 1751 MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO)); 1752 if (messageData == NULL) 1753 { 1754 LogError("malloc failed"); 1755 } 1756 else 1757 { 1758 messageData->messageHandle = IoTHubMessage; 1759 messageData->transportContext = NULL; 1760 1761 if (type == IOTHUB_TYPE_EVENT_QUEUE) 1762 { 1763 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqttNotificationCallback` shall call `IoTHubClient_LL_MessageCallback`. ] 1764 if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx)) 1765 { 1766 LogError("IoTHubClientCore_LL_MessageCallbackreturned false"); 1767 1768 IoTHubMessage_Destroy(IoTHubMessage); 1769 free(messageData); 1770 } 1771 } 1772 else 1773 { 1774 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqttNotificationCallback shall call IoTHubClientCore_LL_MessageCallback. ] */ 1775 if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx)) 1776 { 1777 LogError("IoTHubClientCore_LL_MessageCallback returned false"); 1778 IoTHubMessage_Destroy(IoTHubMessage); 1779 free(messageData); 1780 } 1781 } 1782 } 1783 } 1784 } 1785 1786 // 1787 // mqttNotificationCallback processes incoming PUBLISH messages sent from Hub (or IoT Edge) to this device. 1788 // This function is invoked by umqtt. It determines what topic the PUBLISH was directed at (e.g. Device Twin, Method, etc.), 1789 // performs further parsing based on topic, and translates this call up to "iothub_client" layer for ultimate delivery to application callback. 1790 // 1791 static void mqttNotificationCallback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx) 1792 { 1793 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqttNotificationCallback shall do nothing. ] */ 1794 if (msgHandle != NULL && callbackCtx != NULL) 1795 { 1796 /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqttNotificationCallback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */ 1797 const char* topic_resp = mqttmessage_getTopicName(msgHandle); 1798 if (topic_resp == NULL) 1799 { 1800 LogError("Failure: NULL topic name encountered"); 1801 } 1802 else 1803 { 1804 PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx; 1805 1806 IOTHUB_IDENTITY_TYPE type = retrieveTopicType(topic_resp, STRING_c_str(transportData->topic_InputQueue)); 1807 if (type == IOTHUB_TYPE_DEVICE_TWIN) 1808 { 1809 processTwinNotification(transportData, msgHandle, topic_resp); 1578 1810 } 1579 1811 else if (type == IOTHUB_TYPE_DEVICE_METHODS) 1580 1812 { 1581 STRING_HANDLE method_name = STRING_new(); 1582 if (method_name == NULL) 1583 { 1584 LogError("Failure: allocating method_name string value"); 1585 } 1586 else 1587 { 1588 DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO)); 1589 if (dev_method_info == NULL) 1590 { 1591 LogError("Failure: allocating DEVICE_METHOD_INFO object"); 1592 } 1593 else 1594 { 1595 dev_method_info->request_id = STRING_new(); 1596 if (dev_method_info->request_id == NULL) 1597 { 1598 LogError("Failure constructing request_id string"); 1599 free(dev_method_info); 1600 } 1601 else if (retrieve_device_method_rid_info(topic_resp, method_name, dev_method_info->request_id) != 0) 1602 { 1603 LogError("Failure: retrieve device topic info"); 1604 STRING_delete(dev_method_info->request_id); 1605 free(dev_method_info); 1606 } 1607 else 1608 { 1609 /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */ 1610 const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle); 1611 if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0) 1612 { 1613 LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete"); 1614 STRING_delete(dev_method_info->request_id); 1615 free(dev_method_info); 1616 } 1617 } 1618 } 1619 STRING_delete(method_name); 1620 } 1813 processDeviceMethodNotification(transportData, msgHandle, topic_resp); 1621 1814 } 1622 1815 else 1623 1816 { 1624 const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle); 1625 IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length); 1626 if (IoTHubMessage == NULL) 1627 { 1628 LogError("Failure: IotHub Message creation has failed."); 1629 } 1630 else 1631 { 1632 if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMessage(IoTHubMessage, topic_resp) != 0)) 1633 { 1634 LogError("failure adding input name to property."); 1635 } 1636 // Will need to update this when the service has messages that can be rejected 1637 else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0) 1638 { 1639 LogError("failure extracting mqtt properties."); 1640 } 1641 else 1642 { 1643 MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO)); 1644 if (messageData == NULL) 1645 { 1646 LogError("malloc failed"); 1647 } 1648 else 1649 { 1650 messageData->messageHandle = IoTHubMessage; 1651 messageData->transportContext = NULL; 1652 1653 if (type == IOTHUB_TYPE_EVENT_QUEUE) 1654 { 1655 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqtt_notification_callback` shall call `IoTHubClient_LL_MessageCallback`. ] 1656 if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx)) 1657 { 1658 LogError("IoTHubClientCore_LL_MessageCallbackreturned false"); 1659 1660 IoTHubMessage_Destroy(IoTHubMessage); 1661 free(messageData); 1662 } 1663 } 1664 else 1665 { 1666 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_MessageCallback. ] */ 1667 if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx)) 1668 { 1669 LogError("IoTHubClientCore_LL_MessageCallback returned false"); 1670 IoTHubMessage_Destroy(IoTHubMessage); 1671 free(messageData); 1672 } 1673 } 1674 } 1675 } 1676 } 1677 } 1678 } 1679 } 1680 } 1681 1682 static void mqtt_operation_complete_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx) 1817 processIncomingMessageNotification(transportData, msgHandle, topic_resp, type); 1818 } 1819 } 1820 } 1821 } 1822 1823 // 1824 // mqttOperationCompleteCallback is invoked by umqtt when an operation initiated by the device completes. 1825 // Examples of device initiated operations include PUBLISH, CONNECT, and SUBSCRIBE. 1826 // 1827 static void mqttOperationCompleteCallback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx) 1683 1828 { 1684 1829 (void)handle; … … 1705 1850 { 1706 1851 (void)DList_RemoveEntryList(currentListEntry); //First remove the item from Waiting for Ack List. 1707 sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK);1852 notifyApplicationOfSendMessageComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK); 1708 1853 free(mqttMsgEntry); 1709 1854 } … … 1753 1898 transport_data->isRecoverableError = false; 1754 1899 } 1755 LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve _mqtt_return_codes(connack->returnCode));1900 LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieveMqttReturnCodes(connack->returnCode)); 1756 1901 transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_PENDING_CLOSE; 1757 1902 transport_data->currPacketState = PACKET_TYPE_ERROR; … … 1822 1967 { 1823 1968 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport); 1824 set _saved_tls_options(transport_data, options);1969 setSavedTlsOptions(transport_data, options); 1825 1970 DestroyXioTransport(transport_data); 1826 1971 } 1827 1972 } 1828 1973 1829 static void mqtt_disconnect_cb(void* ctx) 1974 // 1975 // processDisconnectCallback is a callback invoked by umqtt to signal that the disconnection has completed. 1976 // 1977 static void processDisconnectCallback(void* ctx) 1830 1978 { 1831 1979 if (ctx != NULL) … … 1836 1984 } 1837 1985 1986 // 1987 // DisconnectFromClient will tear down the existing MQTT connection, trying to gracefully send an MQTT DISCONNECT (with a timeout), 1988 // destroy the underlying xio for network communication, and update the transport_data state machine appropriately. 1989 // 1990 //NOTE: After a call to DisconnectFromClient, determine if appropriate to also call 1991 // transport_data->transport_callbacks.connection_status_cb(). 1838 1992 static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data) 1839 1993 { … … 1843 1997 { 1844 1998 OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport); 1845 set _saved_tls_options(transport_data, options);1999 setSavedTlsOptions(transport_data, options); 1846 2000 } 1847 2001 // Ensure the disconnect message is sent … … 1849 2003 { 1850 2004 transport_data->disconnect_recv_flag = 0; 1851 (void)mqtt_client_disconnect(transport_data->mqttClient, mqtt_disconnect_cb, &transport_data->disconnect_recv_flag);2005 (void)mqtt_client_disconnect(transport_data->mqttClient, processDisconnectCallback, &transport_data->disconnect_recv_flag); 1852 2006 size_t disconnect_ctr = 0; 1853 2007 do … … 1866 2020 } 1867 2021 1868 static void mqtt_error_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx) 2022 // 2023 // processErrorCallback is invoked by umqtt when an error has occurred. 2024 // 2025 static void processErrorCallback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx) 1869 2026 { 1870 2027 (void)handle; … … 1935 2092 } 1936 2093 2094 // 2095 // SubscribeToMqttProtocol determines which topics we should SUBSCRIBE to, based on existing state, and then 2096 // invokes the underlying umqtt layer to send the SUBSCRIBE across the network. 2097 // 1937 2098 static void SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transport_data) 1938 2099 { … … 1941 2102 uint32_t topic_subscription = 0; 1942 2103 size_t subscribe_count = 0; 1943 uint16_t packet_id = get _next_packet_id(transport_data);2104 uint16_t packet_id = getNextPacketId(transport_data); 1944 2105 SUBSCRIBE_PAYLOAD subscribe[SUBSCRIBE_TOPIC_COUNT]; 1945 2106 if ((transport_data->topic_MqttMessage != NULL) && (SUBSCRIBE_TELEMETRY_TOPIC & transport_data->topics_ToSubscribe)) … … 2008 2169 } 2009 2170 2171 // 2172 // RetrieveMessagePayload translates the payload set by the application in messageHandle into payload/length for sending across the network. 2173 // 2010 2174 static bool RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, const unsigned char** payload, size_t* length) 2011 2175 { … … 2048 2212 } 2049 2213 2050 static void process_queued_ack_messages(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2214 // 2215 // ProcessPendingTelemetryMessages examines each telemetry message the device/module has sent that hasn't yet been PUBACK'd. 2216 // For each message, it might: 2217 // * Ignore it, if its timeout has not yet been reached. 2218 // * Attempt to retry PUBLISH the message, if has remaining retries left. 2219 // * Stop attempting to send the message. This will result in tearing down the underlying MQTT/TCP connection because it indicates 2220 // something is wrong. 2221 // 2222 static void ProcessPendingTelemetryMessages(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2051 2223 { 2052 2224 PDLIST_ENTRY current_entry = transport_data->telemetry_waitingForAck.Flink; … … 2063 2235 if (msg_detail_entry->retryCount >= MAX_SEND_RECOUNT_LIMIT) 2064 2236 { 2065 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);2237 notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT); 2066 2238 (void)DList_RemoveEntryList(current_entry); 2067 2239 free(msg_detail_entry); 2068 2240 2069 2241 DisconnectFromClient(transport_data); 2242 if (!transport_data->isRetryExpiredCallbackCalled) // Only call once 2243 { 2244 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx); 2245 transport_data->isRetryExpiredCallbackCalled = true; 2246 } 2070 2247 } 2071 2248 else … … 2080 2257 { 2081 2258 (void)DList_RemoveEntryList(current_entry); 2082 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);2259 notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); 2083 2260 } 2084 2261 else 2085 2262 { 2086 if (publish _mqtt_telemetry_msg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)2263 if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0) 2087 2264 { 2088 2265 (void)DList_RemoveEntryList(current_entry); 2089 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);2266 notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); 2090 2267 free(msg_detail_entry); 2091 2268 } … … 2103 2280 } 2104 2281 2105 static int GetTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2282 // 2283 // CreateTransportProviderIfNecessary will create the underlying xioTransport (which handles networking I/O) and 2284 // set its options, assuming the xioTransport does not already exist. 2285 // 2286 static int CreateTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2106 2287 { 2107 2288 int result; … … 2138 2319 { 2139 2320 // The tlsio has the options, so our copy can be deleted 2140 set _saved_tls_options(transport_data, NULL);2321 setSavedTlsOptions(transport_data, NULL); 2141 2322 result = 0; 2142 2323 } … … 2167 2348 } 2168 2349 2350 // 2351 // buildClientId creates the MQTT ClientId of this device or module. 2352 // 2169 2353 static STRING_HANDLE buildClientId(const char* device_id, const char* module_id) 2170 2354 { … … 2179 2363 } 2180 2364 2365 // 2366 // buildConfigForUsernameStep2IfNeeded builds the MQTT username. IoT Hub uses the query string of the userName to optionally 2367 // specify SDK information, product information optionally specified by the application, and optionally the PnP ModelId. 2368 // 2181 2369 static int buildConfigForUsernameStep2IfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2182 2370 { … … 2191 2379 // TODO: The preview API version in SDK is only scoped to scenarios that require the modelId to be set. 2192 2380 // https://github.com/Azure/azure-iot-sdk-c/issues/1547 tracks removing this once non-preview API versions support modelId. 2193 const char* apiVersion = (modelId != NULL) ? IOTHUB_API_PREVIEW_VERSION :IOTHUB_API_VERSION;2381 const char* apiVersion = IOTHUB_API_VERSION; 2194 2382 const char* appSpecifiedProductInfo = transport_data->transport_callbacks.prod_info_cb(transport_data->transport_ctx); 2195 STRING_HANDLE productInfoEncoded = NULL; 2383 STRING_HANDLE productInfoEncoded = NULL; 2196 2384 2197 2385 if ((productInfoEncoded = URL_EncodeString((appSpecifiedProductInfo != NULL) ? appSpecifiedProductInfo : DEFAULT_IOTHUB_PRODUCT_IDENTIFIER)) == NULL) … … 2254 2442 } 2255 2443 2444 // 2445 // SendMqttConnectMsg sends the MQTT CONNECT message across the network. This function may also 2446 // perform security functionality for building up the required token (optionally invoking into DPS if configured to do so) 2447 // 2256 2448 static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2257 2449 { … … 2322 2514 options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE; 2323 2515 2324 if ( GetTransportProviderIfNecessary(transport_data) == 0)2516 if (CreateTransportProviderIfNecessary(transport_data) == 0) 2325 2517 { 2326 2518 transport_data->conn_attempted = true; … … 2333 2525 { 2334 2526 transport_data->currPacketState = CONNECT_TYPE; 2335 transport_data->isRetryExpiredCallback Set= false;2527 transport_data->isRetryExpiredCallbackCalled = false; 2336 2528 (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time); 2337 2529 result = 0; … … 2353 2545 } 2354 2546 2355 static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2547 // 2548 // UpdateMqttConnectionStateIfNeeded is used for updating MQTT's underlying connection status during a DoWork loop. 2549 // Among this function's responsibilities: 2550 // * Attempt to establish an MQTT connection if one has not been already. 2551 // * Retries failed connection, if in the correct state. 2552 // * Processes deferred disconnect requests 2553 // * Checks timeouts, for instance on connection establishment time as well as SaS token lifetime (if SAS used) 2554 static int UpdateMqttConnectionStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2356 2555 { 2357 2556 int result = 0; … … 2393 2592 { 2394 2593 // Set callback if retry expired 2395 if (!transport_data->isRetryExpiredCallback Set)2594 if (!transport_data->isRetryExpiredCallbackCalled) 2396 2595 { 2397 2596 transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx); 2398 transport_data->isRetryExpiredCallback Set= true;2597 transport_data->isRetryExpiredCallbackCalled = true; 2399 2598 } 2400 2599 result = MU_FAILURE; … … 2443 2642 if (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_X509_ECC) 2444 2643 { 2445 size_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);2644 uint64_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module); 2446 2645 if ((current_time - transport_data->mqtt_connect_time) / 1000 > (sas_token_expiry*SAS_REFRESH_MULTIPLIER)) 2447 2646 { … … 2494 2693 } 2495 2694 2695 // 2696 // buildMqttEventString creates the MQTT topic for this device (and optionally module) to PUBLISH telemetry to. 2697 // 2496 2698 static STRING_HANDLE buildMqttEventString(const char* device_id, const char* module_id) 2497 2699 { … … 2506 2708 } 2507 2709 2710 // 2711 // buildDevicesAndModulesPath builds the path used when generating a SaS token for this request. 2712 // 2508 2713 static STRING_HANDLE buildDevicesAndModulesPath(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId) 2509 2714 { … … 2518 2723 } 2519 2724 2725 // 2726 // buildTopicMqttMsg builds the MQTT topic that is used for C2D messages sent to a device or module-to-module messages for a module running in IoT Edge 2727 // 2728 static STRING_HANDLE buildTopicMqttMsg(const char* device_id, const char* module_id) 2729 { 2730 if (module_id == NULL) 2731 { 2732 return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id); 2733 } 2734 else 2735 { 2736 return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id); 2737 } 2738 } 2739 2740 // 2741 // checkModuleIdsEqual verifies that module Ids coming from different upper layers are equal. 2742 // 2743 static bool checkModuleIdsEqual(const char* transportModuleId, const char* deviceModuleId) 2744 { 2745 if ((transportModuleId != NULL) && (deviceModuleId == NULL)) 2746 { 2747 return false; 2748 } 2749 else if ((transportModuleId == NULL) && (deviceModuleId != NULL)) 2750 { 2751 return false; 2752 } 2753 else if ((transportModuleId == NULL) && (deviceModuleId == NULL)) 2754 { 2755 return true; 2756 } 2757 else 2758 { 2759 return (0 == strcmp(transportModuleId, deviceModuleId)); 2760 } 2761 } 2762 2763 // 2764 // InitializeTransportHandleData creates a MQTTTRANSPORT_HANDLE_DATA. 2765 // 2520 2766 static PMQTTTRANSPORT_HANDLE_DATA InitializeTransportHandleData(const IOTHUB_CLIENT_CONFIG* upperConfig, PDLIST_ENTRY waitingToSend, IOTHUB_AUTHORIZATION_HANDLE auth_module, const char* moduleId) 2521 2767 { … … 2531 2777 { 2532 2778 LogError("Invalid Argument: iotHubName is empty"); 2533 free _transport_handle_data(state);2779 freeTransportHandleData(state); 2534 2780 state = NULL; 2535 2781 } … … 2538 2784 { 2539 2785 LogError("Failed creating default retry control"); 2540 free _transport_handle_data(state);2786 freeTransportHandleData(state); 2541 2787 state = NULL; 2542 2788 } … … 2544 2790 { 2545 2791 LogError("failure constructing device_id."); 2546 free _transport_handle_data(state);2792 freeTransportHandleData(state); 2547 2793 state = NULL; 2548 2794 } … … 2550 2796 { 2551 2797 LogError("failure constructing module_id."); 2552 free _transport_handle_data(state);2798 freeTransportHandleData(state); 2553 2799 state = NULL; 2554 2800 } … … 2556 2802 { 2557 2803 LogError("failure constructing devicesPath."); 2558 free _transport_handle_data(state);2804 freeTransportHandleData(state); 2559 2805 state = NULL; 2560 2806 } … … 2564 2810 { 2565 2811 LogError("Could not create topic_MqttEvent for MQTT"); 2566 free _transport_handle_data(state);2812 freeTransportHandleData(state); 2567 2813 state = NULL; 2568 2814 } 2569 2815 else 2570 2816 { 2571 state->mqttClient = mqtt_client_init(mqtt _notification_callback, mqtt_operation_complete_callback, state, mqtt_error_callback, state);2817 state->mqttClient = mqtt_client_init(mqttNotificationCallback, mqttOperationCompleteCallback, state, processErrorCallback, state); 2572 2818 if (state->mqttClient == NULL) 2573 2819 { 2574 2820 LogError("failure initializing mqtt client."); 2575 free _transport_handle_data(state);2821 freeTransportHandleData(state); 2576 2822 state = NULL; 2577 2823 } … … 2591 2837 { 2592 2838 LogError("failure constructing host address."); 2593 free _transport_handle_data(state);2839 freeTransportHandleData(state); 2594 2840 state = NULL; 2595 2841 } 2596 2842 else if ((state->configPassedThroughUsername = buildConfigForUsernameStep1(upperConfig, moduleId)) == NULL) 2597 2843 { 2598 free _transport_handle_data(state);2844 freeTransportHandleData(state); 2599 2845 state = NULL; 2600 2846 } … … 2617 2863 2618 2864 state->isDestroyCalled = false; 2619 state->isRetryExpiredCallback Set= false;2865 state->isRetryExpiredCallbackCalled = false; 2620 2866 state->isRegistered = false; 2621 2867 state->device_twin_get_sent = false; … … 2641 2887 } 2642 2888 2889 // 2890 // ProcessSubackDoWork processes state transitions responding to a SUBACK packet. 2891 // This does NOT occur once we receive the SUBACK packet immediately; instead the work is 2892 // deferred to the DoWork loop. 2893 // 2894 static void ProcessSubackDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2895 { 2896 if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) && 2897 !transport_data->device_twin_get_sent) 2898 { 2899 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */ 2900 MQTT_DEVICE_TWIN_ITEM* mqtt_info; 2901 2902 if ((mqtt_info = createDeviceTwinMsg(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL) 2903 { 2904 LogError("Failure: could not create message for twin get command"); 2905 } 2906 else if (publishDeviceTwinGetMsg(transport_data, mqtt_info) == 0) 2907 { 2908 transport_data->device_twin_get_sent = true; 2909 } 2910 else 2911 { 2912 LogError("Failure: sending device twin get command."); 2913 destroyDeviceTwinGetMsg(mqtt_info); 2914 } 2915 } 2916 2917 // Publish can be called now and in any event we need to transition out of this state. 2918 transport_data->currPacketState = PUBLISH_TYPE; 2919 } 2920 2921 // 2922 // ProcessPublishStateDoWork traverses all messages waiting to be sent and attempts to PUBLISH them. 2923 // 2924 static void ProcessPublishStateDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data) 2925 { 2926 PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink; 2927 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */ 2928 while (currentListEntry != transport_data->waitingToSend) 2929 { 2930 IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry); 2931 DLIST_ENTRY savedFromCurrentListEntry; 2932 savedFromCurrentListEntry.Flink = currentListEntry->Flink; 2933 2934 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */ 2935 size_t messageLength; 2936 const unsigned char* messagePayload = NULL; 2937 if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength)) 2938 { 2939 (void)(DList_RemoveEntryList(currentListEntry)); 2940 notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); 2941 LogError("Failure result from IoTHubMessage_GetData"); 2942 } 2943 else 2944 { 2945 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */ 2946 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST)); 2947 if (mqttMsgEntry == NULL) 2948 { 2949 LogError("Allocation Error: Failure allocating MQTT Message Detail List."); 2950 } 2951 else 2952 { 2953 mqttMsgEntry->retryCount = 0; 2954 mqttMsgEntry->iotHubMessageEntry = iothubMsgList; 2955 mqttMsgEntry->packet_id = getNextPacketId(transport_data); 2956 if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0) 2957 { 2958 (void)(DList_RemoveEntryList(currentListEntry)); 2959 notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); 2960 free(mqttMsgEntry); 2961 } 2962 else 2963 { 2964 // Remove the message from the waiting queue ... 2965 (void)(DList_RemoveEntryList(currentListEntry)); 2966 // and add it to the ack queue 2967 DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry)); 2968 } 2969 } 2970 } 2971 currentListEntry = savedFromCurrentListEntry.Flink; 2972 } 2973 2974 if (transport_data->twin_resp_sub_recv) 2975 { 2976 sendPendingGetTwinRequests(transport_data); 2977 } 2978 } 2979 2643 2980 TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Create(const IOTHUBTRANSPORT_CONFIG* config, MQTT_GET_IO_TRANSPORT get_io_transport, TRANSPORT_CALLBACKS_INFO* cb_info, void* ctx) 2644 2981 { … … 2741 3078 PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->telemetry_waitingForAck); 2742 3079 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentEntry, MQTT_MESSAGE_DETAILS_LIST, entry); 2743 sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY);3080 notifyApplicationOfSendMessageComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY); 2744 3081 free(mqttMsgEntry); 2745 3082 } … … 2758 3095 } 2759 3096 2760 destroy _device_twin_get_message(mqtt_device_twin);3097 destroyDeviceTwinGetMsg(mqtt_device_twin); 2761 3098 } 2762 3099 while (!DList_IsListEmpty(&transport_data->pending_get_twin_queue)) … … 2768 3105 mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext); 2769 3106 2770 destroy _device_twin_get_message(mqtt_device_twin);3107 destroyDeviceTwinGetMsg(mqtt_device_twin); 2771 3108 } 2772 3109 2773 3110 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_014: [IoTHubTransport_MQTT_Common_Destroy shall free all the resources currently in use.] */ 2774 3111 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/ 2775 free _transport_handle_data(transport_data);3112 freeTransportHandleData(transport_data); 2776 3113 } 2777 3114 } … … 2860 3197 MQTT_DEVICE_TWIN_ITEM* mqtt_info; 2861 3198 2862 if ((mqtt_info = create _device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)3199 if ((mqtt_info = createDeviceTwinMsg(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL) 2863 3200 { 2864 3201 LogError("Failed creating the device twin get request message"); … … 2869 3206 { 2870 3207 LogError("Failed setting the get twin request enqueue time"); 2871 destroy _device_twin_get_message(mqtt_info);3208 destroyDeviceTwinGetMsg(mqtt_info); 2872 3209 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ] 2873 3210 result = IOTHUB_CLIENT_ERROR; … … 2949 3286 2950 3287 /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_011 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall send the unsubscribe.]*/ 2951 if (mqtt_client_unsubscribe(transport_data->mqttClient, get _next_packet_id(transport_data), unsubscribe, 1) != 0)3288 if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0) 2952 3289 { 2953 3290 LogError("Failure calling mqtt_client_unsubscribe"); … … 2981 3318 else 2982 3319 { 2983 if (publish _device_method_message(transport_data, status, dev_method_info->request_id, response, respSize) != 0)3320 if (publishDeviceMethodResponseMsg(transport_data, status, dev_method_info->request_id, response, respSize) != 0) 2984 3321 { 2985 3322 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */ … … 3004 3341 } 3005 3342 3006 static STRING_HANDLE buildTopicMqttMessage(const char* device_id, const char* module_id)3007 {3008 if (module_id == NULL)3009 {3010 return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id);3011 }3012 else3013 {3014 return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id);3015 }3016 }3017 3018 3343 int IoTHubTransport_MQTT_Common_Subscribe(TRANSPORT_LL_HANDLE handle) 3019 3344 { … … 3029 3354 { 3030 3355 /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the Message Topic.] */ 3031 transport_data->topic_MqttMessage = buildTopicMqttM essage(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));3356 transport_data->topic_MqttMessage = buildTopicMqttMsg(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id)); 3032 3357 if (transport_data->topic_MqttMessage == NULL) 3033 3358 { … … 3055 3380 const char* unsubscribe[1]; 3056 3381 unsubscribe[0] = STRING_c_str(transport_data->topic_MqttMessage); 3057 if (mqtt_client_unsubscribe(transport_data->mqttClient, get _next_packet_id(transport_data), unsubscribe, 1) != 0)3382 if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0) 3058 3383 { 3059 3384 LogError("Failure calling mqtt_client_unsubscribe"); … … 3087 3412 if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv) 3088 3413 { 3089 MQTT_DEVICE_TWIN_ITEM* mqtt_info = create _device_twin_message(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id);3414 MQTT_DEVICE_TWIN_ITEM* mqtt_info = createDeviceTwinMsg(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id); 3090 3415 if (mqtt_info == NULL) 3091 3416 { … … 3099 3424 DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry); 3100 3425 3101 if (publish _device_twin_message(transport_data, iothub_item->device_twin, mqtt_info) != 0)3426 if (publishDeviceTwinMsg(transport_data, iothub_item->device_twin, mqtt_info) != 0) 3102 3427 { 3103 3428 DList_RemoveEntryList(&mqtt_info->entry); … … 3135 3460 if (transport_data != NULL) 3136 3461 { 3137 if ( InitializeConnection(transport_data) == 0)3462 if (UpdateMqttConnectionStateIfNeeded(transport_data) == 0) 3138 3463 { 3139 3464 if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_PENDING_CLOSE) … … 3148 3473 else if (transport_data->currPacketState == SUBACK_TYPE) 3149 3474 { 3150 if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) && 3151 !transport_data->device_twin_get_sent) 3152 { 3153 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */ 3154 MQTT_DEVICE_TWIN_ITEM* mqtt_info; 3155 3156 if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL) 3157 { 3158 LogError("Failure: could not create message for twin get command"); 3159 } 3160 else if (publish_device_twin_get_message(transport_data, mqtt_info) == 0) 3161 { 3162 transport_data->device_twin_get_sent = true; 3163 } 3164 else 3165 { 3166 LogError("Failure: sending device twin get property command."); 3167 destroy_device_twin_get_message(mqtt_info); 3168 } 3169 } 3170 // Publish can be called now 3171 transport_data->currPacketState = PUBLISH_TYPE; 3475 ProcessSubackDoWork(transport_data); 3172 3476 } 3173 3477 else if (transport_data->currPacketState == PUBLISH_TYPE) 3174 3478 { 3175 PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink; 3176 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */ 3177 while (currentListEntry != transport_data->waitingToSend) 3178 { 3179 IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry); 3180 DLIST_ENTRY savedFromCurrentListEntry; 3181 savedFromCurrentListEntry.Flink = currentListEntry->Flink; 3182 3183 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */ 3184 size_t messageLength; 3185 const unsigned char* messagePayload = NULL; 3186 if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength)) 3187 { 3188 (void)(DList_RemoveEntryList(currentListEntry)); 3189 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); 3190 LogError("Failure result from IoTHubMessage_GetData"); 3191 } 3192 else 3193 { 3194 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */ 3195 MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST)); 3196 if (mqttMsgEntry == NULL) 3197 { 3198 LogError("Allocation Error: Failure allocating MQTT Message Detail List."); 3199 } 3200 else 3201 { 3202 mqttMsgEntry->retryCount = 0; 3203 mqttMsgEntry->iotHubMessageEntry = iothubMsgList; 3204 mqttMsgEntry->packet_id = get_next_packet_id(transport_data); 3205 if (publish_mqtt_telemetry_msg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0) 3206 { 3207 (void)(DList_RemoveEntryList(currentListEntry)); 3208 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR); 3209 free(mqttMsgEntry); 3210 } 3211 else 3212 { 3213 // Remove the message from the waiting queue ... 3214 (void)(DList_RemoveEntryList(currentListEntry)); 3215 // and add it to the ack queue 3216 DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry)); 3217 } 3218 } 3219 } 3220 currentListEntry = savedFromCurrentListEntry.Flink; 3221 } 3222 3223 if (transport_data->twin_resp_sub_recv) 3224 { 3225 sendPendingGetTwinRequests(transport_data); 3226 } 3479 ProcessPublishStateDoWork(transport_data); 3227 3480 } 3228 3481 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */ … … 3231 3484 3232 3485 // Check the ack messages timeouts 3233 process_queued_ack_messages(transport_data);3486 ProcessPendingTelemetryMessages(transport_data); 3234 3487 removeExpiredTwinRequests(transport_data); 3235 3488 } 3236 3489 } 3237 3490 3491 int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds) 3492 { 3493 int result; 3494 3495 if (handle == NULL) 3496 { 3497 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */ 3498 LogError("Invalid handle parameter. NULL."); 3499 result = MU_FAILURE; 3500 } 3501 else 3502 { 3503 RETRY_CONTROL_HANDLE new_retry_control_handle; 3504 3505 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters] 3506 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ] 3507 if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL) 3508 { 3509 LogError("Failed creating new retry control handle"); 3510 result = MU_FAILURE; 3511 } 3512 else 3513 { 3514 PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle; 3515 RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle; 3516 3517 transport_data->retry_control_handle = new_retry_control_handle; 3518 retry_control_destroy(previous_retry_control_handle); 3519 3520 /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/ 3521 result = 0; 3522 } 3523 } 3524 3525 return result; 3526 } 3527 3528 3238 3529 IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetSendStatus(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus) 3239 3530 { … … 3242 3533 if (handle == NULL || iotHubClientStatus == NULL) 3243 3534 { 3244 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */3245 LogError("invalid arument.");3246 result = IOTHUB_CLIENT_INVALID_ARG;3247 } 3248 else 3249 { 3250 MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle;3251 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck)))3252 {3253 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */3254 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;3255 }3256 else3257 {3258 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */3259 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;3260 }3261 result = IOTHUB_CLIENT_OK;3535 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */ 3536 LogError("invalid argument."); 3537 result = IOTHUB_CLIENT_INVALID_ARG; 3538 } 3539 else 3540 { 3541 MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle; 3542 if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck))) 3543 { 3544 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */ 3545 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY; 3546 } 3547 else 3548 { 3549 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */ 3550 *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE; 3551 } 3552 result = IOTHUB_CLIENT_OK; 3262 3553 } 3263 3554 return result; … … 3423 3714 { 3424 3715 /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/ 3425 free _proxy_data(transport_data);3716 freeProxyData(transport_data); 3426 3717 3427 3718 transport_data->http_proxy_hostname = copied_proxy_hostname; … … 3443 3734 3444 3735 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */ 3445 if ( GetTransportProviderIfNecessary(transport_data) == 0)3736 if (CreateTransportProviderIfNecessary(transport_data) == 0) 3446 3737 { 3447 3738 if (xio_setoption(transport_data->xioTransport, option, value) == 0) … … 3462 3753 } 3463 3754 return result; 3464 }3465 3466 static bool check_module_ids_equal(const char* transportModuleId, const char* deviceModuleId)3467 {3468 if ((transportModuleId != NULL) && (deviceModuleId == NULL))3469 {3470 return false;3471 }3472 else if ((transportModuleId == NULL) && (deviceModuleId != NULL))3473 {3474 return false;3475 }3476 else if ((transportModuleId == NULL) && (deviceModuleId == NULL))3477 {3478 return true;3479 }3480 else3481 {3482 return (0 == strcmp(transportModuleId, deviceModuleId));3483 }3484 3755 } 3485 3756 … … 3519 3790 result = NULL; 3520 3791 } 3521 else if (!check _module_ids_equal(STRING_c_str(transport_data->module_id), device->moduleId))3792 else if (!checkModuleIdsEqual(STRING_c_str(transport_data->module_id), device->moduleId)) 3522 3793 { 3523 3794 LogError("IoTHubTransport_MQTT_Common_Register: moduleId does not match."); … … 3650 3921 const char* unsubscribe[1]; 3651 3922 unsubscribe[0] = STRING_c_str(transport_data->topic_InputQueue); 3652 if (mqtt_client_unsubscribe(transport_data->mqttClient, get _next_packet_id(transport_data), unsubscribe, 1) != 0)3923 if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0) 3653 3924 { 3654 3925 LogError("Failure calling mqtt_client_unsubscribe");
Note:
See TracChangeset
for help on using the changeset viewer.