Ignore:
Timestamp:
Jun 22, 2021, 9:00:19 PM (3 years ago)
Author:
coas-nagasima
Message:

WolfSSLとAzure IoT SDKを更新

File:
1 edited

Legend:

Unmodified
Added
Removed
  • azure_iot_hub_f767zi/trunk/azure_iot_sdk/iothub_client/src/iothubtransport_mqtt_common.c

    r457 r464  
    8282
    8383static const char* MESSAGE_ID_PROPERTY = "mid";
     84static const char* MESSAGE_CREATION_TIME_UTC = "ctime";
     85static const char* MESSAGE_USER_ID = "uid";
    8486static const char* CORRELATION_ID_PROPERTY = "cid";
    8587static const char* CONTENT_TYPE_PROPERTY = "ct";
     
    200202    MQTT_CLIENT_STATUS mqttClientStatus;
    201203    bool isDestroyCalled;
    202     bool isRetryExpiredCallbackSet;
     204    bool isRetryExpiredCallbackCalled;
    203205    bool device_twin_get_sent;
    204206    bool twin_resp_sub_recv;
     
    274276} DEVICE_METHOD_INFO;
    275277
    276 static void free_proxy_data(MQTTTRANSPORT_HANDLE_DATA* mqtt_transport_instance)
    277 {
    278     if (mqtt_transport_instance->http_proxy_hostname != NULL)
    279     {
    280         free(mqtt_transport_instance->http_proxy_hostname);
    281         mqtt_transport_instance->http_proxy_hostname = NULL;
    282     }
    283 
    284     if (mqtt_transport_instance->http_proxy_username != NULL)
    285     {
    286         free(mqtt_transport_instance->http_proxy_username);
    287         mqtt_transport_instance->http_proxy_username = NULL;
    288     }
    289 
    290     if (mqtt_transport_instance->http_proxy_password != NULL)
    291     {
    292         free(mqtt_transport_instance->http_proxy_password);
    293         mqtt_transport_instance->http_proxy_password = NULL;
    294     }
    295 }
    296 
    297 // Destroys xio transport associated with MQTT handle and resets appropriate state
     278//
     279// InternStrnicmp implements strnicmp.  strnicmp isn't available on all platforms.
     280//
     281static int InternStrnicmp(const char* s1, const char* s2, size_t n)
     282{
     283    int result;
     284
     285    if (s1 == NULL)
     286    {
     287        result = -1;
     288    }
     289    else if (s2 == NULL)
     290    {
     291        result = 1;
     292    }
     293    else
     294    {
     295        result = 0;
     296
     297        while (n-- && result == 0)
     298        {
     299            if (*s1 == 0) result = -1;
     300            else if (*s2 == 0) result = 1;
     301            else
     302            {
     303
     304                result = TOLOWER(*s1) - TOLOWER(*s2);
     305                ++s1;
     306                ++s2;
     307            }
     308        }
     309    }
     310    return result;
     311}
     312
     313//
     314// freeProxyData free()'s and resets proxy related settings of the mqtt_transport_instance.
     315//
     316static void freeProxyData(MQTTTRANSPORT_HANDLE_DATA* transport_data)
     317{
     318    if (transport_data->http_proxy_hostname != NULL)
     319    {
     320        free(transport_data->http_proxy_hostname);
     321        transport_data->http_proxy_hostname = NULL;
     322    }
     323
     324    if (transport_data->http_proxy_username != NULL)
     325    {
     326        free(transport_data->http_proxy_username);
     327        transport_data->http_proxy_username = NULL;
     328    }
     329
     330    if (transport_data->http_proxy_password != NULL)
     331    {
     332        free(transport_data->http_proxy_password);
     333        transport_data->http_proxy_password = NULL;
     334    }
     335}
     336
     337//
     338// DestroyXioTransport frees resources associated with MQTT handle and resets appropriate state
     339//
    298340static void DestroyXioTransport(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    299341{
     342    mqtt_client_clear_xio(transport_data->mqttClient);
    300343    xio_destroy(transport_data->xioTransport);
    301344    transport_data->xioTransport = NULL;
    302345}
    303346
    304 static void set_saved_tls_options(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options)
     347//
     348// setSavedTlsOptions saves off TLS specific options.  This is used
     349// so that during a disconnection, we have these values available for next reconnection.
     350//
     351static void setSavedTlsOptions(PMQTTTRANSPORT_HANDLE_DATA transport, OPTIONHANDLER_HANDLE new_options)
    305352{
    306353    if (transport->saved_tls_options != NULL)
     
    311358}
    312359
    313 static void free_transport_handle_data(MQTTTRANSPORT_HANDLE_DATA* transport_data)
     360//
     361// freeTransportHandleData free()'s 'the transport_data and all members that were allocated by it.
     362//
     363static void freeTransportHandleData(MQTTTRANSPORT_HANDLE_DATA* transport_data)
    314364{
    315365    if (transport_data->mqttClient != NULL)
    316366    {
    317367        mqtt_client_deinit(transport_data->mqttClient);
     368        transport_data->mqttClient = NULL;
    318369    }
    319370
     
    323374    }
    324375
    325     set_saved_tls_options(transport_data, NULL);
     376    setSavedTlsOptions(transport_data, NULL);
    326377
    327378    tickcounter_destroy(transport_data->msgTickCounter);
    328379
    329     free_proxy_data(transport_data);
     380    freeProxyData(transport_data);
    330381
    331382    STRING_delete(transport_data->devicesAndModulesPath);
     
    346397}
    347398
    348 int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds)
    349 {
    350     int result;
    351 
    352     if (handle == NULL)
    353     {
    354         /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */
    355         LogError("Invalid handle parameter. NULL.");
    356         result = MU_FAILURE;
    357     }
    358     else
    359     {
    360         RETRY_CONTROL_HANDLE new_retry_control_handle;
    361 
    362         // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters]
    363         // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ]
    364         if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL)
    365         {
    366             LogError("Failed creating new retry control handle");
    367             result = MU_FAILURE;
    368         }
    369         else
    370         {
    371             PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
    372             RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle;
    373 
    374             transport_data->retry_control_handle = new_retry_control_handle;
    375             retry_control_destroy(previous_retry_control_handle);
    376 
    377             /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/
    378             result = 0;
    379         }
    380     }
    381 
    382     return result;
    383 }
    384 
    385 static uint16_t get_next_packet_id(PMQTTTRANSPORT_HANDLE_DATA transport_data)
     399//
     400// getNextPacketId gets the next Packet Id to use and increments internal counter.
     401//
     402static uint16_t getNextPacketId(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    386403{
    387404    if (transport_data->packetId + 1 >= USHRT_MAX)
     
    397414
    398415#ifndef NO_LOGGING
    399 static const char* retrieve_mqtt_return_codes(CONNECT_RETURN_CODE rtn_code)
     416//
     417// retrieveMqttReturnCodes returns friendly representation of connection code for logging purposes.
     418//
     419static const char* retrieveMqttReturnCodes(CONNECT_RETURN_CODE rtn_code)
    400420{
    401421    switch (rtn_code)
     
    420440#endif // NO_LOGGING
    421441
    422 static int retrieve_device_method_rid_info(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
     442//
     443// retrievDeviceMethodRidInfo parses an incoming MQTT topic for a device method and retrieves the request ID it specifies.
     444//
     445static int retrievDeviceMethodRidInfo(const char* resp_topic, STRING_HANDLE method_name, STRING_HANDLE request_id)
    423446{
    424447    int result;
     
    484507}
    485508
    486 static int parse_device_twin_topic_info(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code)
     509//
     510// parseDeviceTwinTopicInfo parses information about a topic PUBLISH'd to this device/module.
     511//
     512static int parseDeviceTwinTopicInfo(const char* resp_topic, bool* patch_msg, size_t* request_id, int* status_code)
    487513{
    488514    int result;
     
    548574}
    549575
    550 static int InternStrnicmp(const char* s1, const char* s2, size_t n)
    551 {
    552     int result;
    553 
    554     if (s1 == NULL) result = -1;
    555     else if (s2 == NULL) result = 1;
    556     else
    557     {
    558         result = 0;
    559 
    560         while (n-- && result == 0)
    561         {
    562             if (*s1 == 0) result = -1;
    563             else if (*s2 == 0) result = 1;
    564             else
    565             {
    566 
    567                 result = TOLOWER(*s1) - TOLOWER(*s2);
    568                 ++s1;
    569                 ++s2;
    570             }
    571         }
    572     }
    573     return result;
    574 }
    575 
    576 static IOTHUB_IDENTITY_TYPE retrieve_topic_type(const char* topic_resp, const char* input_queue)
     576//
     577// retrieveTopicType translates an MQTT topic PUBLISH'd to this device/module into what type (e.g. twin, method, etc.) it represents.
     578//
     579static IOTHUB_IDENTITY_TYPE retrieveTopicType(const char* topic_resp, const char* input_queue)
    577580{
    578581    IOTHUB_IDENTITY_TYPE type;
     
    598601}
    599602
    600 static void sendMsgComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult)
     603//
     604// notifyApplicationOfSendMessageComplete lets application know that messages in the iothubMsgList have completed (or should be considered failed) with confirmResult status.
     605//
     606static void notifyApplicationOfSendMessageComplete(IOTHUB_MESSAGE_LIST* iothubMsgList, PMQTTTRANSPORT_HANDLE_DATA transport_data, IOTHUB_CLIENT_CONFIRMATION_RESULT confirmResult)
    601607{
    602608    DLIST_ENTRY messageCompleted;
     
    606612}
    607613
     614//
     615// addUserPropertiesTouMqttMessage translates application properties in iothub_message_handle (set by the application with IoTHubMessage_SetProperty e.g.)
     616// into a representation in the MQTT TOPIC topic_string.
     617//
    608618static int addUserPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
    609619{
     
    660670}
    661671
     672//
     673// addSystemPropertyToTopicString appends a given "system" property from iothub_message_handle (set by the application with APIs such as IoTHubMessage_SetMessageId,
     674// IoTHubMessage_SetContentTypeSystemProperty, etc.) onto the MQTT TOPIC topic_string.
     675//
    662676static int addSystemPropertyToTopicString(STRING_HANDLE topic_string, size_t index, const char* property_key, const char* property_value, bool urlencode)
    663677{
     
    690704}
    691705
     706//
     707// addSystemPropertyToTopicString appends all "system" property from iothub_message_handle (set by the application with APIs such as IoTHubMessage_SetMessageId,
     708// IoTHubMessage_SetContentTypeSystemProperty, etc.) onto the MQTT TOPIC topic_string.
     709//
    692710static int addSystemPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr, bool urlencode)
    693711{
     
    734752        }
    735753    }
     754
     755    if (result == 0)
     756    {
     757        const char* message_creation_time_utc = IoTHubMessage_GetMessageCreationTimeUtcSystemProperty(iothub_message_handle);
     758        if (message_creation_time_utc != NULL)
     759        {
     760            result = addSystemPropertyToTopicString(topic_string, index, MESSAGE_CREATION_TIME_UTC, message_creation_time_utc, urlencode);
     761            index++;
     762        }
     763    }
     764
    736765    if (result == 0)
    737766    {
     
    754783}
    755784
     785//
     786// addDiagnosticPropertiesTouMqttMessage appends diagnostic data (as specified by IoTHubMessage_SetDiagnosticPropertyData) onto
     787// the MQTT topic topic_string.
     788//
    756789static int addDiagnosticPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, STRING_HANDLE topic_string, size_t* index_ptr)
    757790{
     
    821854}
    822855
    823 
     856//
     857// addPropertiesTouMqttMessage adds user, "system", and diagnostic messages onto MQTT topic string.  Note that "system" properties is a
     858// construct of the SDK and IoT Hub.  The MQTT protocol itself does not assign any significance to system and user properties (as opposed to AMQP).
     859// The IOTHUB_MESSAGE_HANDLE structure however does have well-known properties (e.g. IoTHubMessage_SetMessageId) that the SDK treats as system
     860// properties where we can automatically fill in the key value for in the key=value list.
     861//
    824862static STRING_HANDLE addPropertiesTouMqttMessage(IOTHUB_MESSAGE_HANDLE iothub_message_handle, const char* eventTopic, bool urlencode)
    825863{
     
    868906}
    869907
    870 static int publish_mqtt_telemetry_msg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
     908//
     909// publishTelemetryMsg invokes the umqtt layer to send a PUBLISH message.
     910//
     911static int publishTelemetryMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data, MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry, const unsigned char* payload, size_t len)
    871912{
    872913    int result;
     
    912953}
    913954
    914 static int publish_device_method_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size)
     955//
     956// publishDeviceMethodResponseMsg invokes the umqtt to send a PUBLISH message that contains device method call results.
     957//
     958static int publishDeviceMethodResponseMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, int status_code, STRING_HANDLE request_id, const unsigned char* response, size_t response_size)
    915959{
    916960    int result;
    917     uint16_t packet_id = get_next_packet_id(transport_data);
     961    uint16_t packet_id = getNextPacketId(transport_data);
    918962
    919963    STRING_HANDLE msg_topic = STRING_construct_sprintf(DEVICE_METHOD_RESPONSE_TOPIC, status_code, STRING_c_str(request_id));
     
    949993}
    950994
    951 
    952 static void destroy_device_twin_get_message(MQTT_DEVICE_TWIN_ITEM* msg_entry)
     995//
     996// destroyDeviceTwinGetMsg frees msg_entry and any data associated with it.
     997//
     998static void destroyDeviceTwinGetMsg(MQTT_DEVICE_TWIN_ITEM* msg_entry)
    953999{
    9541000    free(msg_entry);
    9551001}
    9561002
    957 static MQTT_DEVICE_TWIN_ITEM* create_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id)
     1003//
     1004// createDeviceTwinMsg allocates and fills in structure for MQTT_DEVICE_TWIN_ITEM.
     1005//
     1006static MQTT_DEVICE_TWIN_ITEM* createDeviceTwinMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, DEVICE_TWIN_MSG_TYPE device_twin_msg_type, uint32_t iothub_msg_id)
    9581007{
    9591008    MQTT_DEVICE_TWIN_ITEM* result;
     
    9741023        memset(result, 0, sizeof(*result));
    9751024        result->msgCreationTime = current_time;
    976         result->packet_id = get_next_packet_id(transport_data);
     1025        result->packet_id = getNextPacketId(transport_data);
    9771026        result->iothub_msg_id = iothub_msg_id;
    9781027        result->device_twin_msg_type = device_twin_msg_type;
     
    9821031}
    9831032
    984 static int publish_device_twin_get_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
     1033//
     1034// publishDeviceTwinGetMsg invokes umqtt to PUBLISH a request to get the twin information.
     1035//
     1036static int publishDeviceTwinGetMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
    9851037{
    9861038    int result;
     
    10201072}
    10211073
     1074//
     1075// sendPendingGetTwinRequests will send any queued up GetTwin requests during a DoWork loop.
     1076//
    10221077static void sendPendingGetTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transportData)
    10231078{
     
    10311086        (void)DList_RemoveEntryList(dev_twin_item);
    10321087
    1033         if (publish_device_twin_get_message(transportData, msg_entry) != 0)
     1088        if (publishDeviceTwinGetMsg(transportData, msg_entry) != 0)
    10341089        {
    10351090            LogError("Failed sending pending get twin request");
    1036             destroy_device_twin_get_message(msg_entry);
     1091            destroyDeviceTwinGetMsg(msg_entry);
    10371092        }
    10381093        else
     
    10451100}
    10461101
    1047 
     1102//
     1103// removeExpiredTwinRequestsFromList removes any requests that have timed out.
     1104//
    10481105static void removeExpiredTwinRequestsFromList(PMQTTTRANSPORT_HANDLE_DATA transport_data, tickcounter_ms_t current_ms, DLIST_ENTRY* twin_list)
    10491106{
    10501107    PDLIST_ENTRY list_item = twin_list->Flink;
    1051    
     1108
    10521109    while (list_item != twin_list)
    10531110    {
     
    10761133        {
    10771134            (void)DList_RemoveEntryList(list_item);
    1078             destroy_device_twin_get_message(msg_entry);
     1135            destroyDeviceTwinGetMsg(msg_entry);
    10791136        }
    10801137
     
    10841141}
    10851142
     1143//
     1144// removeExpiredTwinRequests removes any requests that have timed out, regardless of how the request invoked.
     1145//
    10861146static void removeExpiredTwinRequests(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    10871147{
     
    10951155}
    10961156
    1097 static int publish_device_twin_message(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
     1157//
     1158// publishDeviceTwinMsg invokes umqtt to PUBLISH a request for the twin.
     1159//
     1160static int publishDeviceTwinMsg(MQTTTRANSPORT_HANDLE_DATA* transport_data, IOTHUB_DEVICE_TWIN* device_twin_info, MQTT_DEVICE_TWIN_ITEM* mqtt_info)
    10981161{
    10991162    int result;
     
    11491212}
    11501213
     1214//
     1215// changeStateToSubscribeIfAllowed attempts to transition the state machine to subscribe, if our
     1216// current state will allow it. 
     1217// This function does NOT immediately send the SUBSCRIBE however, instead setting things up
     1218// so the next time DoWork is invoked then the SUBSCRIBE will happen.
     1219//
    11511220static void changeStateToSubscribeIfAllowed(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    11521221{
     
    11601229}
    11611230
     1231//
     1232// subscribeToNotifyStateIfNeeded sets up to subscribe to the server.
     1233//
    11621234static int subscribeToNotifyStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    11631235{
     
    11911263}
    11921264
    1193 
     1265//
     1266// isSystemProperty returns whether a given property name in an MQTT TOPIC published to this device/module
     1267// is considered a "system".  MQTT does not have a protocol defined concept of system properties.  In this usage
     1268// it implies that the IOTHUB_MESSAGE_HANDLE has an API for direct manipulation of the property (e.g. IoTHubMessage_GetMessageId).
     1269//
    11941270static bool isSystemProperty(const char* tokenData)
    11951271{
     
    11971273    size_t propCount = sizeof(sysPropList) / sizeof(sysPropList[0]);
    11981274    size_t index = 0;
     1275    size_t tokenDataLength = strlen(tokenData);
     1276
    11991277    for (index = 0; index < propCount; index++)
    12001278    {
    1201         if (memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0)
     1279        if (tokenDataLength >= sysPropList[index].propLength &&
     1280            memcmp(tokenData, sysPropList[index].propName, sysPropList[index].propLength) == 0)
    12021281        {
    12031282            result = true;
     
    12081287}
    12091288
    1210 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ]
    1211 // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ]
    1212 static int addInputNamePropertyToMessage(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name)
    1213 {
     1289//
     1290// addInputNamePropertyToMsg translates the input name (embedded in the MQTT topic name) into the IoTHubMessage handle
     1291// such that the application can call IoTHubMessage_GetInputName() to retrieve this.  This is only currently used
     1292// in IoT Edge module to module message communication, so that this module receiving the message can know which module invoked in.
     1293//
     1294static int addInputNamePropertyToMsg(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name)
     1295{
     1296    // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_061: [ If the message is sent to an input queue, `IoTHubTransport_MQTT_Common_DoWork` shall parse out to the input queue name and store it in the message with IoTHubMessage_SetInputName ]
     1297    // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_31_062: [ If IoTHubTransport_MQTT_Common_DoWork receives a malformatted inputQueue, it shall fail ]
     1298
    12141299    int result = MU_FAILURE;
    12151300    int number_tokens_read = 0;
     
    12611346}
    12621347
     1348//
     1349// setMqttMessagePropertyIfPossible attempts to translate a "system" property into the IOTHUB_MESSAGE_HANDLE that will be provided to the
     1350// application's callback.
     1351//
    12631352static int setMqttMessagePropertyIfPossible(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* propName, const char* propValue, size_t nameLen)
    12641353{
     
    12661355    int result = 0;
    12671356
     1357    if (nameLen > 5)
     1358    {
     1359        if (strcmp((const char*)&propName[nameLen - 5], MESSAGE_CREATION_TIME_UTC) == 0)
     1360        {
     1361            if (IoTHubMessage_SetMessageCreationTimeUtcSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
     1362            {
     1363                LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'CreationTimeUtc' property.");
     1364                result = MU_FAILURE;
     1365            }
     1366            return result;
     1367        }
     1368    }
     1369
    12681370    if (nameLen > 4)
    12691371    {
     
    13091411            return result;
    13101412        }
     1413        else if (strcmp((const char*)&propName[nameLen - 3], MESSAGE_USER_ID) == 0)
     1414        {
     1415            if (IoTHubMessage_SetMessageUserIdSystemProperty(IoTHubMessage, propValue) != IOTHUB_MESSAGE_OK)
     1416            {
     1417                LogError("Failed to set IOTHUB_MESSAGE_HANDLE 'userId' property.");
     1418                result = MU_FAILURE;
     1419            }
     1420            return result;
     1421        }
    13111422    }
    13121423
     
    13381449}
    13391450
     1451//
     1452// extractMqttProperties parses the MQTT topic PUBLISH'd to this device/module, retrieves properties and fills out the
     1453// IOTHUB_MESSAGE_HANDLE which will ultimately be delivered to the application callback.
     1454//
    13401455static int extractMqttProperties(IOTHUB_MESSAGE_HANDLE IoTHubMessage, const char* topic_name, bool urldecode)
    13411456{
     
    15031618}
    15041619
    1505 static void mqtt_notification_callback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx)
    1506 {
    1507     /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqtt_notification_callback shall do nothing. ] */
    1508     if (msgHandle != NULL && callbackCtx != NULL)
    1509     {
    1510         /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqtt_notification_callback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */
    1511         const char* topic_resp = mqttmessage_getTopicName(msgHandle);
    1512         if (topic_resp == NULL)
    1513         {
    1514             LogError("Failure: NULL topic name encountered");
     1620//
     1621// processTwinNotification processes device and module twin updates made by IoT Hub / IoT Edge.
     1622//
     1623static void processTwinNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp)
     1624{
     1625    size_t request_id;
     1626    int status_code;
     1627    bool notification_msg;
     1628
     1629    if (parseDeviceTwinTopicInfo(topic_resp, &notification_msg, &request_id, &status_code) != 0)
     1630    {
     1631        LogError("Failure: parsing device topic info");
     1632    }
     1633    else
     1634    {
     1635        const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
     1636        if (notification_msg)
     1637        {
     1638            transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx);
    15151639        }
    15161640        else
    15171641        {
    1518             PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
    1519 
    1520             IOTHUB_IDENTITY_TYPE type = retrieve_topic_type(topic_resp, STRING_c_str(transportData->topic_InputQueue));
    1521             if (type == IOTHUB_TYPE_DEVICE_TWIN)
    1522             {
    1523                 size_t request_id;
    1524                 int status_code;
    1525                 bool notification_msg;
    1526                 if (parse_device_twin_topic_info(topic_resp, &notification_msg, &request_id, &status_code) != 0)
    1527                 {
    1528                     LogError("Failure: parsing device topic info");
    1529                 }
    1530                 else
    1531                 {
    1532                     const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
    1533                     if (notification_msg)
     1642            PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink;
     1643            while (dev_twin_item != &transportData->ack_waiting_queue)
     1644            {
     1645                DLIST_ENTRY saveListEntry;
     1646                saveListEntry.Flink = dev_twin_item->Flink;
     1647                MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
     1648                if (request_id == msg_entry->packet_id)
     1649                {
     1650                    (void)DList_RemoveEntryList(dev_twin_item);
     1651                    if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES)
    15341652                    {
    1535                         transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_PARTIAL, payload->message, payload->length, transportData->transport_ctx);
     1653                        if (msg_entry->userCallback == NULL)
     1654                        {
     1655                            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqttNotificationCallback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */
     1656                            transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx);
     1657                            // Only after receiving device twin request should we start listening for patches.
     1658                            (void)subscribeToNotifyStateIfNeeded(transportData);
     1659                        }
     1660                        else
     1661                        {
     1662                            // This is a on-demand get twin request.
     1663                            msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext);
     1664                        }
    15361665                    }
    15371666                    else
    15381667                    {
    1539                         PDLIST_ENTRY dev_twin_item = transportData->ack_waiting_queue.Flink;
    1540                         while (dev_twin_item != &transportData->ack_waiting_queue)
    1541                         {
    1542                             DLIST_ENTRY saveListEntry;
    1543                             saveListEntry.Flink = dev_twin_item->Flink;
    1544                             MQTT_DEVICE_TWIN_ITEM* msg_entry = containingRecord(dev_twin_item, MQTT_DEVICE_TWIN_ITEM, entry);
    1545                             if (request_id == msg_entry->packet_id)
    1546                             {
    1547                                 (void)DList_RemoveEntryList(dev_twin_item);
    1548                                 if (msg_entry->device_twin_msg_type == RETRIEVE_PROPERTIES)
    1549                                 {
    1550                                     if (msg_entry->userCallback == NULL)
    1551                                     {
    1552                                         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_054: [ If type is IOTHUB_TYPE_DEVICE_TWIN, then on success if msg_type is RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_RetrievePropertyComplete... ] */
    1553                                         transportData->transport_callbacks.twin_retrieve_prop_complete_cb(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, transportData->transport_ctx);
    1554                                         // Only after receiving device twin request should we start listening for patches.
    1555                                         (void)subscribeToNotifyStateIfNeeded(transportData);
    1556                                     }
    1557                                     else
    1558                                     {
    1559                                         // This is a on-demand get twin request.
    1560                                         msg_entry->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, payload->message, payload->length, msg_entry->userContext);
    1561                                     }
    1562                                 }
    1563                                 else
    1564                                 {
    1565                                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqtt_notification_callback shall call IoTHubClientCore_LL_ReportedStateComplete ] */
    1566                                     transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx);
    1567                                     // Only after receiving device twin request should we start listening for patches.
    1568                                     (void)subscribeToNotifyStateIfNeeded(transportData);
    1569                                 }
    1570 
    1571                                 destroy_device_twin_get_message(msg_entry);
    1572                                 break;
    1573                             }
    1574                             dev_twin_item = saveListEntry.Flink;
    1575                         }
     1668                        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ if device_twin_msg_type is not RETRIEVE_PROPERTIES then mqttNotificationCallback shall call IoTHubClientCore_LL_ReportedStateComplete ] */
     1669                        transportData->transport_callbacks.twin_rpt_state_complete_cb(msg_entry->iothub_msg_id, status_code, transportData->transport_ctx);
     1670                        // Only after receiving device twin request should we start listening for patches.
     1671                        (void)subscribeToNotifyStateIfNeeded(transportData);
    15761672                    }
    1577                 }
     1673
     1674                    destroyDeviceTwinGetMsg(msg_entry);
     1675                    break;
     1676                }
     1677                dev_twin_item = saveListEntry.Flink;
     1678            }
     1679        }
     1680    }
     1681}
     1682
     1683//
     1684// processDeviceMethodNotification processes a device and module method invocations made by IoT Hub / IoT Edge.
     1685//
     1686static void processDeviceMethodNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp)
     1687{
     1688    STRING_HANDLE method_name = STRING_new();
     1689    if (method_name == NULL)
     1690    {
     1691        LogError("Failure: allocating method_name string value");
     1692    }
     1693    else
     1694    {
     1695        DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO));
     1696        if (dev_method_info == NULL)
     1697        {
     1698            LogError("Failure: allocating DEVICE_METHOD_INFO object");
     1699        }
     1700        else
     1701        {
     1702            dev_method_info->request_id = STRING_new();
     1703            if (dev_method_info->request_id == NULL)
     1704            {
     1705                LogError("Failure constructing request_id string");
     1706                free(dev_method_info);
     1707            }
     1708            else if (retrievDeviceMethodRidInfo(topic_resp, method_name, dev_method_info->request_id) != 0)
     1709            {
     1710                LogError("Failure: retrieve device topic info");
     1711                STRING_delete(dev_method_info->request_id);
     1712                free(dev_method_info);
     1713            }
     1714            else
     1715            {
     1716                /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqttNotificationCallback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */
     1717                const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
     1718                if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0)
     1719                {
     1720                    LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete");
     1721                    STRING_delete(dev_method_info->request_id);
     1722                    free(dev_method_info);
     1723                }
     1724            }
     1725        }
     1726        STRING_delete(method_name);
     1727    }
     1728}
     1729
     1730//
     1731// processIncomingMessageNotification processes both C2D messages and messages sent from one IoT Edge module into this module
     1732//
     1733static void processIncomingMessageNotification(PMQTTTRANSPORT_HANDLE_DATA transportData, MQTT_MESSAGE_HANDLE msgHandle, const char* topic_resp, IOTHUB_IDENTITY_TYPE type)
     1734{
     1735    const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle);
     1736    IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length);
     1737    if (IoTHubMessage == NULL)
     1738    {
     1739        LogError("Failure: IotHub Message creation has failed.");
     1740    }
     1741    else if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMsg(IoTHubMessage, topic_resp) != 0))
     1742    {
     1743        LogError("failure adding input name to property.");
     1744    }
     1745    else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0)
     1746    {
     1747        LogError("failure extracting mqtt properties.");
     1748    }
     1749    else
     1750    {
     1751        MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO));
     1752        if (messageData == NULL)
     1753        {
     1754            LogError("malloc failed");
     1755        }
     1756        else
     1757        {
     1758            messageData->messageHandle = IoTHubMessage;
     1759            messageData->transportContext = NULL;
     1760
     1761            if (type == IOTHUB_TYPE_EVENT_QUEUE)
     1762            {
     1763                // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqttNotificationCallback` shall call `IoTHubClient_LL_MessageCallback`. ]
     1764                if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx))
     1765                {
     1766                    LogError("IoTHubClientCore_LL_MessageCallbackreturned false");
     1767
     1768                    IoTHubMessage_Destroy(IoTHubMessage);
     1769                    free(messageData);
     1770                }
     1771            }
     1772            else
     1773            {
     1774                /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqttNotificationCallback shall call IoTHubClientCore_LL_MessageCallback. ] */
     1775                if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx))
     1776                {
     1777                    LogError("IoTHubClientCore_LL_MessageCallback returned false");
     1778                    IoTHubMessage_Destroy(IoTHubMessage);
     1779                    free(messageData);
     1780                }
     1781            }
     1782        }
     1783    }
     1784}
     1785
     1786//
     1787// mqttNotificationCallback processes incoming PUBLISH messages sent from Hub (or IoT Edge) to this device.
     1788// This function is invoked by umqtt.  It determines what topic the PUBLISH was directed at (e.g. Device Twin, Method, etc.),
     1789// performs further parsing based on topic, and translates this call up to "iothub_client" layer for ultimate delivery to application callback.
     1790//
     1791static void mqttNotificationCallback(MQTT_MESSAGE_HANDLE msgHandle, void* callbackCtx)
     1792{
     1793    /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_051: [ If msgHandle or callbackCtx is NULL, mqttNotificationCallback shall do nothing. ] */
     1794    if (msgHandle != NULL && callbackCtx != NULL)
     1795    {
     1796        /* Tests_SRS_IOTHUB_MQTT_TRANSPORT_07_052: [ mqttNotificationCallback shall extract the topic Name from the MQTT_MESSAGE_HANDLE. ] */
     1797        const char* topic_resp = mqttmessage_getTopicName(msgHandle);
     1798        if (topic_resp == NULL)
     1799        {
     1800            LogError("Failure: NULL topic name encountered");
     1801        }
     1802        else
     1803        {
     1804            PMQTTTRANSPORT_HANDLE_DATA transportData = (PMQTTTRANSPORT_HANDLE_DATA)callbackCtx;
     1805
     1806            IOTHUB_IDENTITY_TYPE type = retrieveTopicType(topic_resp, STRING_c_str(transportData->topic_InputQueue));
     1807            if (type == IOTHUB_TYPE_DEVICE_TWIN)
     1808            {
     1809                processTwinNotification(transportData, msgHandle, topic_resp);
    15781810            }
    15791811            else if (type == IOTHUB_TYPE_DEVICE_METHODS)
    15801812            {
    1581                 STRING_HANDLE method_name = STRING_new();
    1582                 if (method_name == NULL)
    1583                 {
    1584                     LogError("Failure: allocating method_name string value");
    1585                 }
    1586                 else
    1587                 {
    1588                     DEVICE_METHOD_INFO* dev_method_info = malloc(sizeof(DEVICE_METHOD_INFO));
    1589                     if (dev_method_info == NULL)
    1590                     {
    1591                         LogError("Failure: allocating DEVICE_METHOD_INFO object");
    1592                     }
    1593                     else
    1594                     {
    1595                         dev_method_info->request_id = STRING_new();
    1596                         if (dev_method_info->request_id == NULL)
    1597                         {
    1598                             LogError("Failure constructing request_id string");
    1599                             free(dev_method_info);
    1600                         }
    1601                         else if (retrieve_device_method_rid_info(topic_resp, method_name, dev_method_info->request_id) != 0)
    1602                         {
    1603                             LogError("Failure: retrieve device topic info");
    1604                             STRING_delete(dev_method_info->request_id);
    1605                             free(dev_method_info);
    1606                         }
    1607                         else
    1608                         {
    1609                             /* CodesSRS_IOTHUB_MQTT_TRANSPORT_07_053: [ If type is IOTHUB_TYPE_DEVICE_METHODS, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_DeviceMethodComplete. ] */
    1610                             const APP_PAYLOAD* payload = mqttmessage_getApplicationMsg(msgHandle);
    1611                             if (transportData->transport_callbacks.method_complete_cb(STRING_c_str(method_name), payload->message, payload->length, (void*)dev_method_info, transportData->transport_ctx) != 0)
    1612                             {
    1613                                 LogError("Failure: IoTHubClientCore_LL_DeviceMethodComplete");
    1614                                 STRING_delete(dev_method_info->request_id);
    1615                                 free(dev_method_info);
    1616                             }
    1617                         }
    1618                     }
    1619                     STRING_delete(method_name);
    1620                 }
     1813                processDeviceMethodNotification(transportData, msgHandle, topic_resp);
    16211814            }
    16221815            else
    16231816            {
    1624                 const APP_PAYLOAD* appPayload = mqttmessage_getApplicationMsg(msgHandle);
    1625                 IOTHUB_MESSAGE_HANDLE IoTHubMessage = IoTHubMessage_CreateFromByteArray(appPayload->message, appPayload->length);
    1626                 if (IoTHubMessage == NULL)
    1627                 {
    1628                     LogError("Failure: IotHub Message creation has failed.");
    1629                 }
    1630                 else
    1631                 {
    1632                     if ((type == IOTHUB_TYPE_EVENT_QUEUE) && (addInputNamePropertyToMessage(IoTHubMessage, topic_resp) != 0))
    1633                     {
    1634                         LogError("failure adding input name to property.");
    1635                     }
    1636                     // Will need to update this when the service has messages that can be rejected
    1637                     else if (extractMqttProperties(IoTHubMessage, topic_resp, transportData->auto_url_encode_decode) != 0)
    1638                     {
    1639                         LogError("failure extracting mqtt properties.");
    1640                     }
    1641                     else
    1642                     {
    1643                         MESSAGE_CALLBACK_INFO* messageData = (MESSAGE_CALLBACK_INFO*)malloc(sizeof(MESSAGE_CALLBACK_INFO));
    1644                         if (messageData == NULL)
    1645                         {
    1646                             LogError("malloc failed");
    1647                         }
    1648                         else
    1649                         {
    1650                             messageData->messageHandle = IoTHubMessage;
    1651                             messageData->transportContext = NULL;
    1652 
    1653                             if (type == IOTHUB_TYPE_EVENT_QUEUE)
    1654                             {
    1655                                 // Codes_SRS_IOTHUB_MQTT_TRANSPORT_31_065: [ If type is IOTHUB_TYPE_TELEMETRY and sent to an input queue, then on success `mqtt_notification_callback` shall call `IoTHubClient_LL_MessageCallback`. ]
    1656                                 if (!transportData->transport_callbacks.msg_input_cb(messageData, transportData->transport_ctx))
    1657                                 {
    1658                                     LogError("IoTHubClientCore_LL_MessageCallbackreturned false");
    1659 
    1660                                     IoTHubMessage_Destroy(IoTHubMessage);
    1661                                     free(messageData);
    1662                                 }
    1663                             }
    1664                             else
    1665                             {
    1666                                 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_056: [ If type is IOTHUB_TYPE_TELEMETRY, then on success mqtt_notification_callback shall call IoTHubClientCore_LL_MessageCallback. ] */
    1667                                 if (!transportData->transport_callbacks.msg_cb(messageData, transportData->transport_ctx))
    1668                                 {
    1669                                     LogError("IoTHubClientCore_LL_MessageCallback returned false");
    1670                                     IoTHubMessage_Destroy(IoTHubMessage);
    1671                                     free(messageData);
    1672                                 }
    1673                             }
    1674                         }
    1675                     }
    1676                 }
    1677             }
    1678         }
    1679     }
    1680 }
    1681 
    1682 static void mqtt_operation_complete_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx)
     1817                processIncomingMessageNotification(transportData, msgHandle, topic_resp, type);
     1818            }
     1819        }
     1820    }
     1821}
     1822
     1823//
     1824// mqttOperationCompleteCallback is invoked by umqtt when an operation initiated by the device completes.
     1825// Examples of device initiated operations include PUBLISH, CONNECT, and SUBSCRIBE.
     1826//
     1827static void mqttOperationCompleteCallback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_RESULT actionResult, const void* msgInfo, void* callbackCtx)
    16831828{
    16841829    (void)handle;
     
    17051850                        {
    17061851                            (void)DList_RemoveEntryList(currentListEntry); //First remove the item from Waiting for Ack List.
    1707                             sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK);
     1852                            notifyApplicationOfSendMessageComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_OK);
    17081853                            free(mqttMsgEntry);
    17091854                        }
     
    17531898                            transport_data->isRecoverableError = false;
    17541899                        }
    1755                         LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieve_mqtt_return_codes(connack->returnCode));
     1900                        LogError("Connection Not Accepted: 0x%x: %s", connack->returnCode, retrieveMqttReturnCodes(connack->returnCode));
    17561901                        transport_data->mqttClientStatus = MQTT_CLIENT_STATUS_PENDING_CLOSE;
    17571902                        transport_data->currPacketState = PACKET_TYPE_ERROR;
     
    18221967    {
    18231968        OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
    1824         set_saved_tls_options(transport_data, options);
     1969        setSavedTlsOptions(transport_data, options);
    18251970        DestroyXioTransport(transport_data);
    18261971    }
    18271972}
    18281973
    1829 static void mqtt_disconnect_cb(void* ctx)
     1974//
     1975// processDisconnectCallback is a callback invoked by umqtt to signal that the disconnection has completed.
     1976//
     1977static void processDisconnectCallback(void* ctx)
    18301978{
    18311979    if (ctx != NULL)
     
    18361984}
    18371985
     1986//
     1987// DisconnectFromClient will tear down the existing MQTT connection, trying to gracefully send an MQTT DISCONNECT (with a timeout),
     1988// destroy the underlying xio for network communication, and update the transport_data state machine appropriately.
     1989//
     1990//NOTE: After a call to DisconnectFromClient, determine if appropriate to also call
     1991//      transport_data->transport_callbacks.connection_status_cb().
    18381992static void DisconnectFromClient(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    18391993{
     
    18431997        {
    18441998            OPTIONHANDLER_HANDLE options = xio_retrieveoptions(transport_data->xioTransport);
    1845             set_saved_tls_options(transport_data, options);
     1999            setSavedTlsOptions(transport_data, options);
    18462000        }
    18472001        // Ensure the disconnect message is sent
     
    18492003        {
    18502004            transport_data->disconnect_recv_flag = 0;
    1851             (void)mqtt_client_disconnect(transport_data->mqttClient, mqtt_disconnect_cb, &transport_data->disconnect_recv_flag);
     2005            (void)mqtt_client_disconnect(transport_data->mqttClient, processDisconnectCallback, &transport_data->disconnect_recv_flag);
    18522006            size_t disconnect_ctr = 0;
    18532007            do
     
    18662020}
    18672021
    1868 static void mqtt_error_callback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx)
     2022//
     2023// processErrorCallback is invoked by umqtt when an error has occurred.
     2024//
     2025static void processErrorCallback(MQTT_CLIENT_HANDLE handle, MQTT_CLIENT_EVENT_ERROR error, void* callbackCtx)
    18692026{
    18702027    (void)handle;
     
    19352092}
    19362093
     2094//
     2095// SubscribeToMqttProtocol determines which topics we should SUBSCRIBE to, based on existing state, and then
     2096// invokes the underlying umqtt layer to send the SUBSCRIBE across the network.
     2097//
    19372098static void SubscribeToMqttProtocol(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    19382099{
     
    19412102        uint32_t topic_subscription = 0;
    19422103        size_t subscribe_count = 0;
    1943         uint16_t packet_id = get_next_packet_id(transport_data);
     2104        uint16_t packet_id = getNextPacketId(transport_data);
    19442105        SUBSCRIBE_PAYLOAD subscribe[SUBSCRIBE_TOPIC_COUNT];
    19452106        if ((transport_data->topic_MqttMessage != NULL) && (SUBSCRIBE_TELEMETRY_TOPIC & transport_data->topics_ToSubscribe))
     
    20082169}
    20092170
     2171//
     2172// RetrieveMessagePayload translates the payload set by the application in messageHandle into payload/length for sending across the network.
     2173//
    20102174static bool RetrieveMessagePayload(IOTHUB_MESSAGE_HANDLE messageHandle, const unsigned char** payload, size_t* length)
    20112175{
     
    20482212}
    20492213
    2050 static void process_queued_ack_messages(PMQTTTRANSPORT_HANDLE_DATA transport_data)
     2214//
     2215// ProcessPendingTelemetryMessages examines each telemetry message the device/module has sent that hasn't yet been PUBACK'd. 
     2216// For each message, it might:
     2217// * Ignore it, if its timeout has not yet been reached.
     2218// * Attempt to retry PUBLISH the message, if has remaining retries left.
     2219// * Stop attempting to send the message.  This will result in tearing down the underlying MQTT/TCP connection because it indicates
     2220//   something is wrong.
     2221//
     2222static void ProcessPendingTelemetryMessages(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    20512223{
    20522224    PDLIST_ENTRY current_entry = transport_data->telemetry_waitingForAck.Flink;
     
    20632235            if (msg_detail_entry->retryCount >= MAX_SEND_RECOUNT_LIMIT)
    20642236            {
    2065                 sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);
     2237                notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_MESSAGE_TIMEOUT);
    20662238                (void)DList_RemoveEntryList(current_entry);
    20672239                free(msg_detail_entry);
    20682240
    20692241                DisconnectFromClient(transport_data);
     2242                if (!transport_data->isRetryExpiredCallbackCalled) // Only call once
     2243                {
     2244                    transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
     2245                    transport_data->isRetryExpiredCallbackCalled = true;
     2246                }
    20702247            }
    20712248            else
     
    20802257                    {
    20812258                        (void)DList_RemoveEntryList(current_entry);
    2082                         sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
     2259                        notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
    20832260                    }
    20842261                    else
    20852262                    {
    2086                         if (publish_mqtt_telemetry_msg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)
     2263                        if (publishTelemetryMsg(transport_data, msg_detail_entry, messagePayload, messageLength) != 0)
    20872264                        {
    20882265                            (void)DList_RemoveEntryList(current_entry);
    2089                             sendMsgComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
     2266                            notifyApplicationOfSendMessageComplete(msg_detail_entry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
    20902267                            free(msg_detail_entry);
    20912268                        }
     
    21032280}
    21042281
    2105 static int GetTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
     2282//
     2283// CreateTransportProviderIfNecessary will create the underlying xioTransport (which handles networking I/O) and
     2284// set its options, assuming the xioTransport does not already exist.
     2285//
     2286static int CreateTransportProviderIfNecessary(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    21062287{
    21072288    int result;
     
    21382319                {
    21392320                    // The tlsio has the options, so our copy can be deleted
    2140                     set_saved_tls_options(transport_data, NULL);
     2321                    setSavedTlsOptions(transport_data, NULL);
    21412322                    result = 0;
    21422323                }
     
    21672348}
    21682349
     2350//
     2351// buildClientId creates the MQTT ClientId of this device or module.
     2352//
    21692353static STRING_HANDLE buildClientId(const char* device_id, const char* module_id)
    21702354{
     
    21792363}
    21802364
     2365//
     2366// buildConfigForUsernameStep2IfNeeded builds the MQTT username.  IoT Hub uses the query string of the userName to optionally
     2367// specify SDK information, product information optionally specified by the application, and optionally the PnP ModelId.
     2368//
    21812369static int buildConfigForUsernameStep2IfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    21822370{
     
    21912379        // TODO: The preview API version in SDK is only scoped to scenarios that require the modelId to be set.
    21922380        // https://github.com/Azure/azure-iot-sdk-c/issues/1547 tracks removing this once non-preview API versions support modelId.
    2193         const char* apiVersion = (modelId != NULL) ? IOTHUB_API_PREVIEW_VERSION : IOTHUB_API_VERSION;
     2381        const char* apiVersion = IOTHUB_API_VERSION;
    21942382        const char* appSpecifiedProductInfo = transport_data->transport_callbacks.prod_info_cb(transport_data->transport_ctx);
    2195         STRING_HANDLE productInfoEncoded = NULL; 
     2383        STRING_HANDLE productInfoEncoded = NULL;
    21962384
    21972385        if ((productInfoEncoded = URL_EncodeString((appSpecifiedProductInfo != NULL) ? appSpecifiedProductInfo : DEFAULT_IOTHUB_PRODUCT_IDENTIFIER)) == NULL)
     
    22542442}
    22552443
     2444//
     2445// SendMqttConnectMsg sends the MQTT CONNECT message across the network.  This function may also
     2446// perform security functionality for building up the required token (optionally invoking into DPS if configured to do so)
     2447//
    22562448static int SendMqttConnectMsg(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    22572449{
     
    23222514            options.qualityOfServiceValue = DELIVER_AT_LEAST_ONCE;
    23232515
    2324             if (GetTransportProviderIfNecessary(transport_data) == 0)
     2516            if (CreateTransportProviderIfNecessary(transport_data) == 0)
    23252517            {
    23262518                transport_data->conn_attempted = true;
     
    23332525                {
    23342526                    transport_data->currPacketState = CONNECT_TYPE;
    2335                     transport_data->isRetryExpiredCallbackSet = false;
     2527                    transport_data->isRetryExpiredCallbackCalled = false;
    23362528                    (void)tickcounter_get_current_ms(transport_data->msgTickCounter, &transport_data->mqtt_connect_time);
    23372529                    result = 0;
     
    23532545}
    23542546
    2355 static int InitializeConnection(PMQTTTRANSPORT_HANDLE_DATA transport_data)
     2547//
     2548// UpdateMqttConnectionStateIfNeeded is used for updating MQTT's underlying connection status during a DoWork loop.
     2549// Among this function's responsibilities:
     2550// * Attempt to establish an MQTT connection if one has not been already.
     2551// * Retries failed connection, if in the correct state.
     2552// * Processes deferred disconnect requests
     2553// * Checks timeouts, for instance on connection establishment time as well as SaS token lifetime (if SAS used)
     2554static int UpdateMqttConnectionStateIfNeeded(PMQTTTRANSPORT_HANDLE_DATA transport_data)
    23562555{
    23572556    int result = 0;
     
    23932592            {
    23942593                // Set callback if retry expired
    2395                 if (!transport_data->isRetryExpiredCallbackSet)
     2594                if (!transport_data->isRetryExpiredCallbackCalled)
    23962595                {
    23972596                    transport_data->transport_callbacks.connection_status_cb(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_RETRY_EXPIRED, transport_data->transport_ctx);
    2398                     transport_data->isRetryExpiredCallbackSet = true;
     2597                    transport_data->isRetryExpiredCallbackCalled = true;
    23992598                }
    24002599                result = MU_FAILURE;
     
    24432642                if (cred_type != IOTHUB_CREDENTIAL_TYPE_X509 && cred_type != IOTHUB_CREDENTIAL_TYPE_X509_ECC)
    24442643                {
    2445                     size_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);
     2644                    uint64_t sas_token_expiry = IoTHubClient_Auth_Get_SasToken_Expiry(transport_data->authorization_module);
    24462645                    if ((current_time - transport_data->mqtt_connect_time) / 1000 > (sas_token_expiry*SAS_REFRESH_MULTIPLIER))
    24472646                    {
     
    24942693}
    24952694
     2695//
     2696// buildMqttEventString creates the MQTT topic for this device (and optionally module) to PUBLISH telemetry to.
     2697//
    24962698static STRING_HANDLE buildMqttEventString(const char* device_id, const char* module_id)
    24972699{
     
    25062708}
    25072709
     2710//
     2711// buildDevicesAndModulesPath builds the path used when generating a SaS token for this request.
     2712//
    25082713static STRING_HANDLE buildDevicesAndModulesPath(const IOTHUB_CLIENT_CONFIG* upperConfig, const char* moduleId)
    25092714{
     
    25182723}
    25192724
     2725//
     2726// buildTopicMqttMsg builds the MQTT topic that is used for C2D messages sent to a device or module-to-module messages for a module running in IoT Edge
     2727//
     2728static STRING_HANDLE buildTopicMqttMsg(const char* device_id, const char* module_id)
     2729{
     2730    if (module_id == NULL)
     2731    {
     2732        return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id);
     2733    }
     2734    else
     2735    {
     2736        return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id);
     2737    }
     2738}
     2739
     2740//
     2741// checkModuleIdsEqual verifies that module Ids coming from different upper layers are equal.
     2742//
     2743static bool checkModuleIdsEqual(const char* transportModuleId, const char* deviceModuleId)
     2744{
     2745    if ((transportModuleId != NULL) && (deviceModuleId == NULL))
     2746    {
     2747        return false;
     2748    }
     2749    else if ((transportModuleId == NULL) && (deviceModuleId != NULL))
     2750    {
     2751        return false;
     2752    }
     2753    else if ((transportModuleId == NULL) && (deviceModuleId == NULL))
     2754    {
     2755        return true;
     2756    }
     2757    else
     2758    {
     2759        return (0 == strcmp(transportModuleId, deviceModuleId));
     2760    }
     2761}
     2762
     2763//
     2764// InitializeTransportHandleData creates a MQTTTRANSPORT_HANDLE_DATA.
     2765//
    25202766static PMQTTTRANSPORT_HANDLE_DATA InitializeTransportHandleData(const IOTHUB_CLIENT_CONFIG* upperConfig, PDLIST_ENTRY waitingToSend, IOTHUB_AUTHORIZATION_HANDLE auth_module, const char* moduleId)
    25212767{
     
    25312777        {
    25322778            LogError("Invalid Argument: iotHubName is empty");
    2533             free_transport_handle_data(state);
     2779            freeTransportHandleData(state);
    25342780            state = NULL;
    25352781        }
     
    25382784        {
    25392785            LogError("Failed creating default retry control");
    2540             free_transport_handle_data(state);
     2786            freeTransportHandleData(state);
    25412787            state = NULL;
    25422788        }
     
    25442790        {
    25452791            LogError("failure constructing device_id.");
    2546             free_transport_handle_data(state);
     2792            freeTransportHandleData(state);
    25472793            state = NULL;
    25482794        }
     
    25502796        {
    25512797            LogError("failure constructing module_id.");
    2552             free_transport_handle_data(state);
     2798            freeTransportHandleData(state);
    25532799            state = NULL;
    25542800        }
     
    25562802        {
    25572803            LogError("failure constructing devicesPath.");
    2558             free_transport_handle_data(state);
     2804            freeTransportHandleData(state);
    25592805            state = NULL;
    25602806        }
     
    25642810            {
    25652811                LogError("Could not create topic_MqttEvent for MQTT");
    2566                 free_transport_handle_data(state);
     2812                freeTransportHandleData(state);
    25672813                state = NULL;
    25682814            }
    25692815            else
    25702816            {
    2571                 state->mqttClient = mqtt_client_init(mqtt_notification_callback, mqtt_operation_complete_callback, state, mqtt_error_callback, state);
     2817                state->mqttClient = mqtt_client_init(mqttNotificationCallback, mqttOperationCompleteCallback, state, processErrorCallback, state);
    25722818                if (state->mqttClient == NULL)
    25732819                {
    25742820                    LogError("failure initializing mqtt client.");
    2575                     free_transport_handle_data(state);
     2821                    freeTransportHandleData(state);
    25762822                    state = NULL;
    25772823                }
     
    25912837                    {
    25922838                        LogError("failure constructing host address.");
    2593                         free_transport_handle_data(state);
     2839                        freeTransportHandleData(state);
    25942840                        state = NULL;
    25952841                    }
    25962842                    else if ((state->configPassedThroughUsername = buildConfigForUsernameStep1(upperConfig, moduleId)) == NULL)
    25972843                    {
    2598                         free_transport_handle_data(state);
     2844                        freeTransportHandleData(state);
    25992845                        state = NULL;
    26002846                    }
     
    26172863
    26182864                        state->isDestroyCalled = false;
    2619                         state->isRetryExpiredCallbackSet = false;
     2865                        state->isRetryExpiredCallbackCalled = false;
    26202866                        state->isRegistered = false;
    26212867                        state->device_twin_get_sent = false;
     
    26412887}
    26422888
     2889//
     2890// ProcessSubackDoWork processes state transitions responding to a SUBACK packet.
     2891// This does NOT occur once we receive the SUBACK packet immediately; instead the work is
     2892// deferred to the DoWork loop.
     2893//
     2894static void ProcessSubackDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data)
     2895{
     2896    if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) &&
     2897        !transport_data->device_twin_get_sent)
     2898    {
     2899        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */
     2900        MQTT_DEVICE_TWIN_ITEM* mqtt_info;
     2901
     2902        if ((mqtt_info = createDeviceTwinMsg(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
     2903        {
     2904            LogError("Failure: could not create message for twin get command");
     2905        }
     2906        else if (publishDeviceTwinGetMsg(transport_data, mqtt_info) == 0)
     2907        {
     2908            transport_data->device_twin_get_sent = true;
     2909        }
     2910        else
     2911        {
     2912            LogError("Failure: sending device twin get command.");
     2913            destroyDeviceTwinGetMsg(mqtt_info);
     2914        }
     2915    }
     2916
     2917    // Publish can be called now and in any event we need to transition out of this state.
     2918    transport_data->currPacketState = PUBLISH_TYPE;
     2919}
     2920
     2921//
     2922// ProcessPublishStateDoWork traverses all messages waiting to be sent and attempts to PUBLISH them.
     2923//
     2924static void ProcessPublishStateDoWork(PMQTTTRANSPORT_HANDLE_DATA transport_data)
     2925{
     2926    PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink;
     2927    /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
     2928    while (currentListEntry != transport_data->waitingToSend)
     2929    {
     2930        IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry);
     2931        DLIST_ENTRY savedFromCurrentListEntry;
     2932        savedFromCurrentListEntry.Flink = currentListEntry->Flink;
     2933
     2934        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
     2935        size_t messageLength;
     2936        const unsigned char* messagePayload = NULL;
     2937        if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength))
     2938        {
     2939            (void)(DList_RemoveEntryList(currentListEntry));
     2940            notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
     2941            LogError("Failure result from IoTHubMessage_GetData");
     2942        }
     2943        else
     2944        {
     2945            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */
     2946            MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST));
     2947            if (mqttMsgEntry == NULL)
     2948            {
     2949                LogError("Allocation Error: Failure allocating MQTT Message Detail List.");
     2950            }
     2951            else
     2952            {
     2953                mqttMsgEntry->retryCount = 0;
     2954                mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
     2955                mqttMsgEntry->packet_id = getNextPacketId(transport_data);
     2956                if (publishTelemetryMsg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0)
     2957                {
     2958                    (void)(DList_RemoveEntryList(currentListEntry));
     2959                    notifyApplicationOfSendMessageComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
     2960                    free(mqttMsgEntry);
     2961                }
     2962                else
     2963                {
     2964                    // Remove the message from the waiting queue ...
     2965                    (void)(DList_RemoveEntryList(currentListEntry));
     2966                    // and add it to the ack queue
     2967                    DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry));
     2968                }
     2969            }
     2970        }
     2971        currentListEntry = savedFromCurrentListEntry.Flink;
     2972    }
     2973
     2974    if (transport_data->twin_resp_sub_recv)
     2975    {
     2976        sendPendingGetTwinRequests(transport_data);
     2977    }
     2978}
     2979
    26432980TRANSPORT_LL_HANDLE IoTHubTransport_MQTT_Common_Create(const IOTHUBTRANSPORT_CONFIG* config, MQTT_GET_IO_TRANSPORT get_io_transport, TRANSPORT_CALLBACKS_INFO* cb_info, void* ctx)
    26442981{
     
    27413078            PDLIST_ENTRY currentEntry = DList_RemoveHeadList(&transport_data->telemetry_waitingForAck);
    27423079            MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = containingRecord(currentEntry, MQTT_MESSAGE_DETAILS_LIST, entry);
    2743             sendMsgComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY);
     3080            notifyApplicationOfSendMessageComplete(mqttMsgEntry->iotHubMessageEntry, transport_data, IOTHUB_CLIENT_CONFIRMATION_BECAUSE_DESTROY);
    27443081            free(mqttMsgEntry);
    27453082        }
     
    27583095            }
    27593096
    2760             destroy_device_twin_get_message(mqtt_device_twin);
     3097            destroyDeviceTwinGetMsg(mqtt_device_twin);
    27613098        }
    27623099        while (!DList_IsListEmpty(&transport_data->pending_get_twin_queue))
     
    27683105            mqtt_device_twin->userCallback(DEVICE_TWIN_UPDATE_COMPLETE, NULL, 0, mqtt_device_twin->userContext);
    27693106
    2770             destroy_device_twin_get_message(mqtt_device_twin);
     3107            destroyDeviceTwinGetMsg(mqtt_device_twin);
    27713108        }
    27723109
    27733110        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_014: [IoTHubTransport_MQTT_Common_Destroy shall free all the resources currently in use.] */
    27743111        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_012: [ `IoTHubTransport_MQTT_Common_Destroy` shall free the stored proxy options. ]*/
    2775         free_transport_handle_data(transport_data);
     3112        freeTransportHandleData(transport_data);
    27763113    }
    27773114}
     
    28603197        MQTT_DEVICE_TWIN_ITEM* mqtt_info;
    28613198
    2862         if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
     3199        if ((mqtt_info = createDeviceTwinMsg(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
    28633200        {
    28643201            LogError("Failed creating the device twin get request message");
     
    28693206        {
    28703207            LogError("Failed setting the get twin request enqueue time");
    2871             destroy_device_twin_get_message(mqtt_info);
     3208            destroyDeviceTwinGetMsg(mqtt_info);
    28723209            // Codes_SRS_IOTHUB_MQTT_TRANSPORT_09_003: [ If any failure occurs, IoTHubTransport_MQTT_Common_GetTwinAsync shall return IOTHUB_CLIENT_ERROR ]
    28733210            result = IOTHUB_CLIENT_ERROR;
     
    29493286
    29503287            /*Codes_SRS_IOTHUB_MQTT_TRANSPORT_12_011 : [IoTHubTransport_MQTT_Common_Unsubscribe_DeviceMethod shall send the unsubscribe.]*/
    2951             if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
     3288            if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0)
    29523289            {
    29533290                LogError("Failure calling mqtt_client_unsubscribe");
     
    29813318        else
    29823319        {
    2983             if (publish_device_method_message(transport_data, status, dev_method_info->request_id, response, respSize) != 0)
     3320            if (publishDeviceMethodResponseMsg(transport_data, status, dev_method_info->request_id, response, respSize) != 0)
    29843321            {
    29853322                /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_07_051: [ If any error is encountered, IoTHubTransport_MQTT_Common_DeviceMethod_Response shall return a non-zero value. ] */
     
    30043341}
    30053342
    3006 static STRING_HANDLE buildTopicMqttMessage(const char* device_id, const char* module_id)
    3007 {
    3008     if (module_id == NULL)
    3009     {
    3010         return STRING_construct_sprintf(TOPIC_DEVICE_MSG, device_id);
    3011     }
    3012     else
    3013     {
    3014         return STRING_construct_sprintf(TOPIC_DEVICE_MODULE_MSG, device_id, module_id);
    3015     }
    3016 }
    3017 
    30183343int IoTHubTransport_MQTT_Common_Subscribe(TRANSPORT_LL_HANDLE handle)
    30193344{
     
    30293354    {
    30303355        /* Code_SRS_IOTHUB_MQTT_TRANSPORT_07_016: [IoTHubTransport_MQTT_Common_Subscribe shall set a flag to enable mqtt_client_subscribe to be called to subscribe to the Message Topic.] */
    3031         transport_data->topic_MqttMessage = buildTopicMqttMessage(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
     3356        transport_data->topic_MqttMessage = buildTopicMqttMsg(STRING_c_str(transport_data->device_id), STRING_c_str(transport_data->module_id));
    30323357        if (transport_data->topic_MqttMessage == NULL)
    30333358        {
     
    30553380        const char* unsubscribe[1];
    30563381        unsubscribe[0] = STRING_c_str(transport_data->topic_MqttMessage);
    3057         if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
     3382        if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0)
    30583383        {
    30593384            LogError("Failure calling mqtt_client_unsubscribe");
     
    30873412            if (item_type == IOTHUB_TYPE_DEVICE_TWIN && transport_data->twin_resp_sub_recv)
    30883413            {
    3089                 MQTT_DEVICE_TWIN_ITEM* mqtt_info = create_device_twin_message(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id);
     3414                MQTT_DEVICE_TWIN_ITEM* mqtt_info = createDeviceTwinMsg(transport_data, REPORTED_STATE, iothub_item->device_twin->item_id);
    30903415                if (mqtt_info == NULL)
    30913416                {
     
    30993424                    DList_InsertTailList(&transport_data->ack_waiting_queue, &mqtt_info->entry);
    31003425
    3101                     if (publish_device_twin_message(transport_data, iothub_item->device_twin, mqtt_info) != 0)
     3426                    if (publishDeviceTwinMsg(transport_data, iothub_item->device_twin, mqtt_info) != 0)
    31023427                    {
    31033428                        DList_RemoveEntryList(&mqtt_info->entry);
     
    31353460    if (transport_data != NULL)
    31363461    {
    3137         if (InitializeConnection(transport_data) == 0)
     3462        if (UpdateMqttConnectionStateIfNeeded(transport_data) == 0)
    31383463        {
    31393464            if (transport_data->mqttClientStatus == MQTT_CLIENT_STATUS_PENDING_CLOSE)
     
    31483473            else if (transport_data->currPacketState == SUBACK_TYPE)
    31493474            {
    3150                 if ((transport_data->topic_NotifyState != NULL || transport_data->topic_GetState != NULL) &&
    3151                     !transport_data->device_twin_get_sent)
    3152                 {
    3153                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_055: [ IoTHubTransport_MQTT_Common_DoWork shall send a device twin get property message upon successfully retrieving a SUBACK on device twin topics. ] */
    3154                     MQTT_DEVICE_TWIN_ITEM* mqtt_info;
    3155 
    3156                     if ((mqtt_info = create_device_twin_message(transport_data, RETRIEVE_PROPERTIES, 0)) == NULL)
    3157                     {
    3158                         LogError("Failure: could not create message for twin get command");
    3159                     }
    3160                     else if (publish_device_twin_get_message(transport_data, mqtt_info) == 0)
    3161                     {
    3162                         transport_data->device_twin_get_sent = true;
    3163                     }
    3164                     else
    3165                     {
    3166                         LogError("Failure: sending device twin get property command.");
    3167                         destroy_device_twin_get_message(mqtt_info);
    3168                     }
    3169                 }
    3170                 // Publish can be called now
    3171                 transport_data->currPacketState = PUBLISH_TYPE;
     3475                ProcessSubackDoWork(transport_data);
    31723476            }
    31733477            else if (transport_data->currPacketState == PUBLISH_TYPE)
    31743478            {
    3175                 PDLIST_ENTRY currentListEntry = transport_data->waitingToSend->Flink;
    3176                 /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
    3177                 while (currentListEntry != transport_data->waitingToSend)
    3178                 {
    3179                     IOTHUB_MESSAGE_LIST* iothubMsgList = containingRecord(currentListEntry, IOTHUB_MESSAGE_LIST, entry);
    3180                     DLIST_ENTRY savedFromCurrentListEntry;
    3181                     savedFromCurrentListEntry.Flink = currentListEntry->Flink;
    3182 
    3183                     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_027: [IoTHubTransport_MQTT_Common_DoWork shall inspect the "waitingToSend" DLIST passed in config structure.] */
    3184                     size_t messageLength;
    3185                     const unsigned char* messagePayload = NULL;
    3186                     if (!RetrieveMessagePayload(iothubMsgList->messageHandle, &messagePayload, &messageLength))
    3187                     {
    3188                         (void)(DList_RemoveEntryList(currentListEntry));
    3189                         sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
    3190                         LogError("Failure result from IoTHubMessage_GetData");
    3191                     }
    3192                     else
    3193                     {
    3194                         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_029: [IoTHubTransport_MQTT_Common_DoWork shall create a MQTT_MESSAGE_HANDLE and pass this to a call to mqtt_client_publish.] */
    3195                         MQTT_MESSAGE_DETAILS_LIST* mqttMsgEntry = (MQTT_MESSAGE_DETAILS_LIST*)malloc(sizeof(MQTT_MESSAGE_DETAILS_LIST));
    3196                         if (mqttMsgEntry == NULL)
    3197                         {
    3198                             LogError("Allocation Error: Failure allocating MQTT Message Detail List.");
    3199                         }
    3200                         else
    3201                         {
    3202                             mqttMsgEntry->retryCount = 0;
    3203                             mqttMsgEntry->iotHubMessageEntry = iothubMsgList;
    3204                             mqttMsgEntry->packet_id = get_next_packet_id(transport_data);
    3205                             if (publish_mqtt_telemetry_msg(transport_data, mqttMsgEntry, messagePayload, messageLength) != 0)
    3206                             {
    3207                                 (void)(DList_RemoveEntryList(currentListEntry));
    3208                                 sendMsgComplete(iothubMsgList, transport_data, IOTHUB_CLIENT_CONFIRMATION_ERROR);
    3209                                 free(mqttMsgEntry);
    3210                             }
    3211                             else
    3212                             {
    3213                                 // Remove the message from the waiting queue ...
    3214                                 (void)(DList_RemoveEntryList(currentListEntry));
    3215                                 // and add it to the ack queue
    3216                                 DList_InsertTailList(&(transport_data->telemetry_waitingForAck), &(mqttMsgEntry->entry));
    3217                             }
    3218                         }
    3219                     }
    3220                     currentListEntry = savedFromCurrentListEntry.Flink;
    3221                 }
    3222 
    3223                 if (transport_data->twin_resp_sub_recv)
    3224                 {
    3225                     sendPendingGetTwinRequests(transport_data);
    3226                 }
     3479                ProcessPublishStateDoWork(transport_data);
    32273480            }
    32283481            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_030: [IoTHubTransport_MQTT_Common_DoWork shall call mqtt_client_dowork everytime it is called if it is connected.] */
     
    32313484
    32323485        // Check the ack messages timeouts
    3233         process_queued_ack_messages(transport_data);
     3486        ProcessPendingTelemetryMessages(transport_data);
    32343487        removeExpiredTwinRequests(transport_data);
    32353488    }
    32363489}
    32373490
     3491int IoTHubTransport_MQTT_Common_SetRetryPolicy(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_RETRY_POLICY retryPolicy, size_t retryTimeoutLimitInSeconds)
     3492{
     3493    int result;
     3494
     3495    if (handle == NULL)
     3496    {
     3497        /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_041: [**If any handle is NULL then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return resultant line.] */
     3498        LogError("Invalid handle parameter. NULL.");
     3499        result = MU_FAILURE;
     3500    }
     3501    else
     3502    {
     3503        RETRY_CONTROL_HANDLE new_retry_control_handle;
     3504
     3505        // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_006: [ IoTHubTransport_MQTT_Common_SetRetryPolicy shall set the retry logic by calling retry_control_create() with retry policy and retryTimeout as parameters]
     3506        // Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_09_009: [ If retry_control_create() fails then IoTHubTransport_MQTT_Common_SetRetryPolicy shall revert to previous retry policy and return non-zero value ]
     3507        if ((new_retry_control_handle = retry_control_create(retryPolicy, (unsigned int)retryTimeoutLimitInSeconds)) == NULL)
     3508        {
     3509            LogError("Failed creating new retry control handle");
     3510            result = MU_FAILURE;
     3511        }
     3512        else
     3513        {
     3514            PMQTTTRANSPORT_HANDLE_DATA transport_data = (PMQTTTRANSPORT_HANDLE_DATA)handle;
     3515            RETRY_CONTROL_HANDLE previous_retry_control_handle = transport_data->retry_control_handle;
     3516
     3517            transport_data->retry_control_handle = new_retry_control_handle;
     3518            retry_control_destroy(previous_retry_control_handle);
     3519
     3520            /*Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_25_045: [**If retry logic for specified parameters of retry policy and retryTimeoutLimitInSeconds is created successfully then IoTHubTransport_MQTT_Common_SetRetryPolicy shall return 0]*/
     3521            result = 0;
     3522        }
     3523    }
     3524
     3525    return result;
     3526}
     3527
     3528
    32383529IOTHUB_CLIENT_RESULT IoTHubTransport_MQTT_Common_GetSendStatus(TRANSPORT_LL_HANDLE handle, IOTHUB_CLIENT_STATUS *iotHubClientStatus)
    32393530{
     
    32423533    if (handle == NULL || iotHubClientStatus == NULL)
    32433534    {
    3244     /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */
    3245     LogError("invalid arument.");
    3246     result = IOTHUB_CLIENT_INVALID_ARG;
    3247     }
    3248     else
    3249     {
    3250     MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle;
    3251     if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck)))
    3252     {
    3253         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */
    3254         *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
    3255     }
    3256     else
    3257     {
    3258         /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */
    3259         *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
    3260     }
    3261     result = IOTHUB_CLIENT_OK;
     3535        /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_023: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_INVALID_ARG if called with NULL parameter.] */
     3536        LogError("invalid argument.");
     3537        result = IOTHUB_CLIENT_INVALID_ARG;
     3538    }
     3539    else
     3540    {
     3541        MQTTTRANSPORT_HANDLE_DATA* handleData = (MQTTTRANSPORT_HANDLE_DATA*)handle;
     3542        if (!DList_IsListEmpty(handleData->waitingToSend) || !DList_IsListEmpty(&(handleData->telemetry_waitingForAck)))
     3543        {
     3544            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_025: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_BUSY if there are currently event items to be sent or being sent.] */
     3545            *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_BUSY;
     3546        }
     3547        else
     3548        {
     3549            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_024: [IoTHubTransport_MQTT_Common_GetSendStatus shall return IOTHUB_CLIENT_OK and status IOTHUB_CLIENT_SEND_STATUS_IDLE if there are currently no event items to be sent or being sent.] */
     3550            *iotHubClientStatus = IOTHUB_CLIENT_SEND_STATUS_IDLE;
     3551        }
     3552        result = IOTHUB_CLIENT_OK;
    32623553    }
    32633554    return result;
     
    34233714                {
    34243715                    /* Codes_SRS_IOTHUB_TRANSPORT_MQTT_COMMON_01_009: [ When setting the proxy options succeeds any previously saved proxy options shall be freed. ]*/
    3425                     free_proxy_data(transport_data);
     3716                    freeProxyData(transport_data);
    34263717
    34273718                    transport_data->http_proxy_hostname = copied_proxy_hostname;
     
    34433734
    34443735            /* Codes_SRS_IOTHUB_MQTT_TRANSPORT_07_032: [IoTHubTransport_MQTT_Common_SetOption shall pass down the option to xio_setoption if the option parameter is not a known option string for the MQTT transport.] */
    3445             if (GetTransportProviderIfNecessary(transport_data) == 0)
     3736            if (CreateTransportProviderIfNecessary(transport_data) == 0)
    34463737            {
    34473738                if (xio_setoption(transport_data->xioTransport, option, value) == 0)
     
    34623753    }
    34633754    return result;
    3464 }
    3465 
    3466 static bool check_module_ids_equal(const char* transportModuleId, const char* deviceModuleId)
    3467 {
    3468     if ((transportModuleId != NULL) && (deviceModuleId == NULL))
    3469     {
    3470         return false;
    3471     }
    3472     else if ((transportModuleId == NULL) && (deviceModuleId != NULL))
    3473     {
    3474         return false;
    3475     }
    3476     else if ((transportModuleId == NULL) && (deviceModuleId == NULL))
    3477     {
    3478         return true;
    3479     }
    3480     else
    3481     {
    3482         return (0 == strcmp(transportModuleId, deviceModuleId));
    3483     }
    34843755}
    34853756
     
    35193790                result = NULL;
    35203791            }
    3521             else if (!check_module_ids_equal(STRING_c_str(transport_data->module_id), device->moduleId))
     3792            else if (!checkModuleIdsEqual(STRING_c_str(transport_data->module_id), device->moduleId))
    35223793            {
    35233794                LogError("IoTHubTransport_MQTT_Common_Register: moduleId does not match.");
     
    36503921        const char* unsubscribe[1];
    36513922        unsubscribe[0] = STRING_c_str(transport_data->topic_InputQueue);
    3652         if (mqtt_client_unsubscribe(transport_data->mqttClient, get_next_packet_id(transport_data), unsubscribe, 1) != 0)
     3923        if (mqtt_client_unsubscribe(transport_data->mqttClient, getNextPacketId(transport_data), unsubscribe, 1) != 0)
    36533924        {
    36543925            LogError("Failure calling mqtt_client_unsubscribe");
Note: See TracChangeset for help on using the changeset viewer.