source: azure_iot_hub/trunk/azure_iohub/iothub_client/src/iothubtransport.c@ 388

Last change on this file since 388 was 388, checked in by coas-nagasima, 5 years ago

Azure IoT Hub Device C SDK を使ったサンプルの追加

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
  • Property svn:mime-type set to text/x-csrc
File size: 19.4 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <signal.h>
6#include <stddef.h>
7#include "azure_c_shared_utility/gballoc.h"
8#include "azure_c_shared_utility/crt_abstractions.h"
9#include "internal/iothubtransport.h"
10#include "iothub_client_core.h"
11#include "internal/iothub_client_private.h"
12#include "azure_c_shared_utility/threadapi.h"
13#include "azure_c_shared_utility/lock.h"
14#include "azure_c_shared_utility/xlogging.h"
15#include "azure_c_shared_utility/vector.h"
16
17#include "internal/iothubtransport.h"
18#include "internal/iothub_client_private.h"
19#include "iothub_transport_ll.h"
20#include "iothub_client_core.h"
21
22typedef struct TRANSPORT_HANDLE_DATA_TAG
23{
24 TRANSPORT_LL_HANDLE transportLLHandle;
25 THREAD_HANDLE workerThreadHandle;
26 LOCK_HANDLE lockHandle;
27 sig_atomic_t stopThread;
28 TRANSPORT_PROVIDER_FIELDS;
29 VECTOR_HANDLE clients;
30 LOCK_HANDLE clientsLockHandle;
31 IOTHUB_CLIENT_MULTIPLEXED_DO_WORK clientDoWork;
32} TRANSPORT_HANDLE_DATA;
33
34/* Used for Unit test */
35const size_t IoTHubTransport_ThreadTerminationOffset = offsetof(TRANSPORT_HANDLE_DATA, stopThread);
36
37TRANSPORT_HANDLE IoTHubTransport_Create(IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol, const char* iotHubName, const char* iotHubSuffix)
38{
39 TRANSPORT_HANDLE_DATA *result;
40 TRANSPORT_CALLBACKS_INFO transport_cb;
41
42 if (protocol == NULL || iotHubName == NULL || iotHubSuffix == NULL)
43 {
44 /*Codes_SRS_IOTHUBTRANSPORT_17_002: [ If protocol is NULL, this function shall return NULL. ]*/
45 /*Codes_SRS_IOTHUBTRANSPORT_17_003: [ If iotHubName is NULL, this function shall return NULL. ]*/
46 /*Codes_SRS_IOTHUBTRANSPORT_17_004: [ If iotHubSuffix is NULL, this function shall return NULL. ]*/
47 LogError("Invalid NULL argument, protocol [%p], name [%p], suffix [%p].", protocol, iotHubName, iotHubSuffix);
48 result = NULL;
49 }
50 else if (IoTHubClientCore_LL_GetTransportCallbacks(&transport_cb) != 0)
51 {
52 LogError("Failure getting transport callbacks");
53 result = NULL;
54 }
55 else
56 {
57 /*Codes_SRS_IOTHUBTRANSPORT_17_032: [ IoTHubTransport_Create shall allocate memory for the transport data. ]*/
58 result = (TRANSPORT_HANDLE_DATA*)malloc(sizeof(TRANSPORT_HANDLE_DATA));
59 if (result == NULL)
60 {
61 /*Codes_SRS_IOTHUBTRANSPORT_17_040: [ If memory allocation fails, IoTHubTransport_Create shall return NULL. ]*/
62 LogError("Transport handle was not allocated.");
63 }
64 else
65 {
66 TRANSPORT_PROVIDER * transportProtocol = (TRANSPORT_PROVIDER*)(protocol());
67 IOTHUB_CLIENT_CONFIG upperConfig;
68 upperConfig.deviceId = NULL;
69 upperConfig.deviceKey = NULL;
70 upperConfig.iotHubName = iotHubName;
71 upperConfig.iotHubSuffix = iotHubSuffix;
72 upperConfig.protocol = protocol;
73 upperConfig.protocolGatewayHostName = NULL;
74
75 IOTHUBTRANSPORT_CONFIG transportLLConfig;
76 memset(&transportLLConfig, 0, sizeof(IOTHUBTRANSPORT_CONFIG));
77 transportLLConfig.upperConfig = &upperConfig;
78 transportLLConfig.waitingToSend = NULL;
79
80 /*Codes_SRS_IOTHUBTRANSPORT_17_005: [ IoTHubTransport_Create shall create the lower layer transport by calling the protocol's IoTHubTransport_Create function. ]*/
81 result->transportLLHandle = transportProtocol->IoTHubTransport_Create(&transportLLConfig, &transport_cb, NULL);
82 if (result->transportLLHandle == NULL)
83 {
84 /*Codes_SRS_IOTHUBTRANSPORT_17_006: [ If the creation of the transport fails, IoTHubTransport_Create shall return NULL. ]*/
85 LogError("Lower Layer transport not created.");
86 free(result);
87 result = NULL;
88 }
89 else
90 {
91 /*Codes_SRS_IOTHUBTRANSPORT_17_007: [ IoTHubTransport_Create shall create the transport lock by Calling Lock_Init. ]*/
92 result->lockHandle = Lock_Init();
93 if (result->lockHandle == NULL)
94 {
95 /*Codes_SRS_IOTHUBTRANSPORT_17_008: [ If the lock creation fails, IoTHubTransport_Create shall return NULL. ]*/
96 LogError("transport Lock not created.");
97 transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
98 free(result);
99 result = NULL;
100 }
101 else if ((result->clientsLockHandle = Lock_Init()) == NULL)
102 {
103 LogError("clients Lock not created.");
104 Lock_Deinit(result->lockHandle);
105 transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
106 free(result);
107 result = NULL;
108 }
109 else
110 {
111 /*Codes_SRS_IOTHUBTRANSPORT_17_038: [ IoTHubTransport_Create shall call VECTOR_Create to make a list of IOTHUB_CLIENT_CORE_HANDLE using this transport. ]*/
112 result->clients = VECTOR_create(sizeof(IOTHUB_CLIENT_CORE_HANDLE));
113 if (result->clients == NULL)
114 {
115 /*Codes_SRS_IOTHUBTRANSPORT_17_039: [ If the Vector creation fails, IoTHubTransport_Create shall return NULL. ]*/
116 /*Codes_SRS_IOTHUBTRANSPORT_17_009: [ IoTHubTransport_Create shall clean up any resources it creates if the function does not succeed. ]*/
117 LogError("clients list not created.");
118 Lock_Deinit(result->clientsLockHandle);
119 Lock_Deinit(result->lockHandle);
120 transportProtocol->IoTHubTransport_Destroy(result->transportLLHandle);
121 free(result);
122 result = NULL;
123 }
124 else
125 {
126 /*Codes_SRS_IOTHUBTRANSPORT_17_001: [ IoTHubTransport_Create shall return a non-NULL handle on success.]*/
127 result->stopThread = 1;
128 result->clientDoWork = NULL;
129 result->workerThreadHandle = NULL; /* create thread when work needs to be done */
130 result->IoTHubTransport_GetHostname = transportProtocol->IoTHubTransport_GetHostname;
131 result->IoTHubTransport_SetOption = transportProtocol->IoTHubTransport_SetOption;
132 result->IoTHubTransport_Create = transportProtocol->IoTHubTransport_Create;
133 result->IoTHubTransport_Destroy = transportProtocol->IoTHubTransport_Destroy;
134 result->IoTHubTransport_Register = transportProtocol->IoTHubTransport_Register;
135 result->IoTHubTransport_Unregister = transportProtocol->IoTHubTransport_Unregister;
136 result->IoTHubTransport_Subscribe = transportProtocol->IoTHubTransport_Subscribe;
137 result->IoTHubTransport_Unsubscribe = transportProtocol->IoTHubTransport_Unsubscribe;
138 result->IoTHubTransport_DoWork = transportProtocol->IoTHubTransport_DoWork;
139 result->IoTHubTransport_SetRetryPolicy = transportProtocol->IoTHubTransport_SetRetryPolicy;
140 result->IoTHubTransport_GetSendStatus = transportProtocol->IoTHubTransport_GetSendStatus;
141 }
142 }
143 }
144 }
145 }
146
147 return result;
148}
149
150static void multiplexed_client_do_work(TRANSPORT_HANDLE_DATA* transportData)
151{
152 if (Lock(transportData->clientsLockHandle) != LOCK_OK)
153 {
154 LogError("failed to lock for multiplexed_client_do_work");
155 }
156 else
157 {
158 size_t numberOfClients;
159 size_t iterator;
160
161 numberOfClients = VECTOR_size(transportData->clients);
162 for (iterator = 0; iterator < numberOfClients; iterator++)
163 {
164 IOTHUB_CLIENT_CORE_HANDLE* clientHandle = (IOTHUB_CLIENT_CORE_HANDLE*)VECTOR_element(transportData->clients, iterator);
165
166 if (clientHandle != NULL)
167 {
168 transportData->clientDoWork(*clientHandle);
169 }
170 }
171
172 if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
173 {
174 LogError("failed to unlock on multiplexed_client_do_work");
175 }
176 }
177}
178
179static int transport_worker_thread(void* threadArgument)
180{
181 TRANSPORT_HANDLE_DATA* transportData = (TRANSPORT_HANDLE_DATA*)threadArgument;
182
183 while (1)
184 {
185 /*Codes_SRS_IOTHUBTRANSPORT_17_030: [ All calls to lower layer transport DoWork shall be protected by the lock created in IoTHubTransport_Create. ]*/
186 if (Lock(transportData->lockHandle) == LOCK_OK)
187 {
188 /*Codes_SRS_IOTHUBTRANSPORT_17_031: [ If acquiring the lock fails, lower layer transport DoWork shall not be called. ]*/
189 if (transportData->stopThread)
190 {
191 /*Codes_SRS_IOTHUBTRANSPORT_17_028: [ The thread shall exit when IoTHubTransport_EndWorkerThread has been called for each clientHandle which invoked IoTHubTransport_StartWorkerThread. ]*/
192 (void)Unlock(transportData->lockHandle);
193 break;
194 }
195 else
196 {
197 (transportData->IoTHubTransport_DoWork)(transportData->transportLLHandle);
198
199 (void)Unlock(transportData->lockHandle);
200 }
201 }
202
203 multiplexed_client_do_work(transportData);
204
205 /*Codes_SRS_IOTHUBTRANSPORT_17_029: [ The thread shall call lower layer transport DoWork every 1 ms. ]*/
206 ThreadAPI_Sleep(1);
207 }
208
209 ThreadAPI_Exit(0);
210 return 0;
211}
212
213static bool find_by_handle(const void* element, const void* value)
214{
215 /* data stored at element is device handle */
216 const IOTHUB_CLIENT_CORE_HANDLE * guess = (const IOTHUB_CLIENT_CORE_HANDLE *)element;
217 const IOTHUB_CLIENT_CORE_HANDLE match = (const IOTHUB_CLIENT_CORE_HANDLE)value;
218 return (*guess == match);
219}
220
221static IOTHUB_CLIENT_RESULT start_worker_if_needed(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
222{
223 IOTHUB_CLIENT_RESULT result;
224 if (transportData->workerThreadHandle == NULL)
225 {
226 /*Codes_SRS_IOTHUBTRANSPORT_17_018: [ If the worker thread does not exist, IoTHubTransport_StartWorkerThread shall start the thread using ThreadAPI_Create. ]*/
227 transportData->stopThread = 0;
228 if (ThreadAPI_Create(&transportData->workerThreadHandle, transport_worker_thread, transportData) != THREADAPI_OK)
229 {
230 transportData->workerThreadHandle = NULL;
231 }
232 }
233 if (transportData->workerThreadHandle != NULL)
234 {
235 if (Lock(transportData->clientsLockHandle) != LOCK_OK)
236 {
237 LogError("failed to lock for start_worker_if_needed");
238 result = IOTHUB_CLIENT_ERROR;
239 }
240 else
241 {
242 /*Codes_SRS_IOTHUBTRANSPORT_17_020: [ IoTHubTransport_StartWorkerThread shall search for IoTHubClient clientHandle in the list of IoTHubClient handles. ]*/
243 bool addToList = ((VECTOR_size(transportData->clients) == 0) || (VECTOR_find_if(transportData->clients, find_by_handle, clientHandle) == NULL));
244 if (addToList)
245 {
246 /*Codes_SRS_IOTHUBTRANSPORT_17_021: [ If handle is not found, then clientHandle shall be added to the list. ]*/
247 if (VECTOR_push_back(transportData->clients, &clientHandle, 1) != 0)
248 {
249 LogError("Failed adding device to list (VECTOR_push_back failed)");
250 /*Codes_SRS_IOTHUBTRANSPORT_17_042: [ If Adding to the client list fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. ]*/
251 result = IOTHUB_CLIENT_ERROR;
252 }
253 else
254 {
255 result = IOTHUB_CLIENT_OK;
256 }
257 }
258 else
259 {
260 result = IOTHUB_CLIENT_OK;
261 }
262
263 if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
264 {
265 LogError("failed to unlock on start_worker_if_needed");
266 }
267 }
268 }
269 else
270 {
271 result = IOTHUB_CLIENT_ERROR;
272 }
273 return result;
274}
275
276static void stop_worker_thread(TRANSPORT_HANDLE_DATA* transportData)
277{
278 /*Codes_SRS_IOTHUBTRANSPORT_17_043: [** IoTHubTransport_SignalEndWorkerThread shall signal the worker thread to end.*/
279 if (Lock(transportData->lockHandle) != LOCK_OK)
280 {
281 // Need to setup a critical error function here to inform the user that an critical error
282 // has occured.
283 LogError("Unable to lock - will still attempt to end thread without thread safety");
284 transportData->stopThread = 1;
285 }
286 else
287 {
288 transportData->stopThread = 1;
289 (void)Unlock(transportData->lockHandle);
290 }
291
292}
293
294static void wait_worker_thread(TRANSPORT_HANDLE_DATA * transportData)
295{
296 if (transportData->workerThreadHandle != NULL)
297 {
298 int res;
299 /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ If handle list is empty, IoTHubTransport_EndWorkerThread shall be joined. ]*/
300 if (ThreadAPI_Join(transportData->workerThreadHandle, &res) != THREADAPI_OK)
301 {
302 LogError("ThreadAPI_Join failed");
303 }
304 else
305 {
306 transportData->workerThreadHandle = NULL;
307 }
308 }
309}
310
311static bool signal_end_worker_thread(TRANSPORT_HANDLE_DATA * transportData, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
312{
313 bool okToJoin;
314
315 if (Lock(transportData->clientsLockHandle) != LOCK_OK)
316 {
317 LogError("failed to lock for signal_end_worker_thread");
318 okToJoin = false;
319 }
320 else
321 {
322 void* element = VECTOR_find_if(transportData->clients, find_by_handle, clientHandle);
323 if (element != NULL)
324 {
325 /*Codes_SRS_IOTHUBTRANSPORT_17_026: [ IoTHubTransport_EndWorkerThread shall remove clientHandlehandle from handle list. ]*/
326 VECTOR_erase(transportData->clients, element, 1);
327 }
328 /*Codes_SRS_IOTHUBTRANSPORT_17_025: [ If the worker thread does not exist, then IoTHubTransport_EndWorkerThread shall return. ]*/
329 if (transportData->workerThreadHandle != NULL)
330 {
331 if (VECTOR_size(transportData->clients) == 0)
332 {
333 stop_worker_thread(transportData);
334 okToJoin = true;
335 }
336 else
337 {
338 okToJoin = false;
339 }
340 }
341 else
342 {
343 okToJoin = false;
344 }
345
346 if (Unlock(transportData->clientsLockHandle) != LOCK_OK)
347 {
348 LogError("failed to unlock on signal_end_worker_thread");
349 }
350 }
351 return okToJoin;
352}
353
354void IoTHubTransport_Destroy(TRANSPORT_HANDLE transportHandle)
355{
356 /*Codes_SRS_IOTHUBTRANSPORT_17_011: [ IoTHubTransport_Destroy shall do nothing if transportHandle is NULL. ]*/
357 if (transportHandle != NULL)
358 {
359 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
360 /*Codes_SRS_IOTHUBTRANSPORT_17_033: [ IoTHubTransport_Destroy shall lock the transport lock. ]*/
361 stop_worker_thread(transportData);
362 wait_worker_thread(transportData);
363 /*Codes_SRS_IOTHUBTRANSPORT_17_010: [ IoTHubTransport_Destroy shall free all resources. ]*/
364 Lock_Deinit(transportData->lockHandle);
365 (transportData->IoTHubTransport_Destroy)(transportData->transportLLHandle);
366 VECTOR_destroy(transportData->clients);
367 Lock_Deinit(transportData->clientsLockHandle);
368 free(transportHandle);
369 }
370}
371
372LOCK_HANDLE IoTHubTransport_GetLock(TRANSPORT_HANDLE transportHandle)
373{
374 LOCK_HANDLE lock;
375 if (transportHandle == NULL)
376 {
377 /*Codes_SRS_IOTHUBTRANSPORT_17_013: [ If transportHandle is NULL, IoTHubTransport_GetLock shall return NULL. ]*/
378 lock = NULL;
379 }
380 else
381 {
382 /*Codes_SRS_IOTHUBTRANSPORT_17_012: [ IoTHubTransport_GetLock shall return a handle to the transport lock. ]*/
383 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
384 lock = transportData->lockHandle;
385 }
386 return lock;
387}
388
389TRANSPORT_LL_HANDLE IoTHubTransport_GetLLTransport(TRANSPORT_HANDLE transportHandle)
390{
391 TRANSPORT_LL_HANDLE llTransport;
392 if (transportHandle == NULL)
393 {
394 /*Codes_SRS_IOTHUBTRANSPORT_17_015: [ If transportHandle is NULL, IoTHubTransport_GetLLTransport shall return NULL. ]*/
395 llTransport = NULL;
396 }
397 else
398 {
399 /*Codes_SRS_IOTHUBTRANSPORT_17_014: [ IoTHubTransport_GetLLTransport shall return a handle to the lower layer transport. ]*/
400 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
401 llTransport = transportData->transportLLHandle;
402 }
403 return llTransport;
404}
405
406IOTHUB_CLIENT_RESULT IoTHubTransport_StartWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle, IOTHUB_CLIENT_MULTIPLEXED_DO_WORK muxDoWork)
407{
408 IOTHUB_CLIENT_RESULT result;
409 if (transportHandle == NULL || clientHandle == NULL)
410 {
411 /*Codes_SRS_IOTHUBTRANSPORT_17_016: [ If transportHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
412 /*Codes_SRS_IOTHUBTRANSPORT_17_017: [ If clientHandle is NULL, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_INVALID_ARG. ]*/
413 result = IOTHUB_CLIENT_INVALID_ARG;
414 }
415 else
416 {
417 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
418
419 if (transportData->clientDoWork == NULL)
420 {
421 transportData->clientDoWork = muxDoWork;
422 }
423
424 if ((result = start_worker_if_needed(transportData, clientHandle)) != IOTHUB_CLIENT_OK)
425 {
426 /*Codes_SRS_IOTHUBTRANSPORT_17_019: [ If thread creation fails, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_ERROR. */
427 LogError("Unable to start thread safely");
428 }
429 else
430 {
431 /*Codes_SRS_IOTHUBTRANSPORT_17_022: [ Upon success, IoTHubTransport_StartWorkerThread shall return IOTHUB_CLIENT_OK. ]*/
432 result = IOTHUB_CLIENT_OK;
433 }
434 }
435 return result;
436}
437
438bool IoTHubTransport_SignalEndWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
439{
440 bool okToJoin;
441 /*Codes_SRS_IOTHUBTRANSPORT_17_023: [ If transportHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
442 /*Codes_SRS_IOTHUBTRANSPORT_17_024: [ If clientHandle is NULL, IoTHubTransport_EndWorkerThread shall return. ]*/
443 if (!(transportHandle == NULL || clientHandle == NULL))
444 {
445 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
446 okToJoin = signal_end_worker_thread(transportData, clientHandle);
447 }
448 else
449 {
450 okToJoin = false;
451 }
452 return okToJoin;
453}
454
455void IoTHubTransport_JoinWorkerThread(TRANSPORT_HANDLE transportHandle, IOTHUB_CLIENT_CORE_HANDLE clientHandle)
456{
457 /*Codes_SRS_IOTHUBTRANSPORT_17_044: [ If transportHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
458 /*Codes_SRS_IOTHUBTRANSPORT_17_045: [ If clientHandle is NULL, IoTHubTransport_JoinWorkerThread shall do nothing. ]*/
459 if (!(transportHandle == NULL || clientHandle == NULL))
460 {
461 TRANSPORT_HANDLE_DATA * transportData = (TRANSPORT_HANDLE_DATA*)transportHandle;
462 /*Codes_SRS_IOTHUBTRANSPORT_17_027: [ The worker thread shall be joined. ]*/
463 wait_worker_thread(transportData);
464 }
465}
Note: See TracBrowser for help on using the repository browser.