source: azure_iot_hub/trunk/azure_iohub/umqtt/src/mqtt_codec.c@ 388

Last change on this file since 388 was 388, checked in by coas-nagasima, 5 years ago

Azure IoT Hub Device C SDK を使ったサンプルの追加

  • Property svn:eol-style set to native
  • Property svn:keywords set to Id
  • Property svn:mime-type set to text/x-csrc
File size: 39.8 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <limits.h>
6#include "azure_c_shared_utility/optimize_size.h"
7#include "azure_c_shared_utility/gballoc.h"
8#include "azure_c_shared_utility/buffer_.h"
9#include "azure_c_shared_utility/strings.h"
10#include "azure_c_shared_utility/macro_utils.h"
11#include "azure_c_shared_utility/xlogging.h"
12#include "azure_umqtt_c/mqtt_codec.h"
13#include <inttypes.h>
14
15#define PAYLOAD_OFFSET 5
16#define PACKET_TYPE_BYTE(p) (CONTROL_PACKET_TYPE)((uint8_t)(((uint8_t)(p)) & 0xf0))
17#define FLAG_VALUE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf))
18
19#define USERNAME_FLAG 0x80
20#define PASSWORD_FLAG 0x40
21#define WILL_RETAIN_FLAG 0x20
22#define WILL_QOS_FLAG_ 0x18
23#define WILL_FLAG_FLAG 0x04
24#define CLEAN_SESSION_FLAG 0x02
25
26#define NEXT_128_CHUNK 0x80
27#define PUBLISH_DUP_FLAG 0x8
28#define PUBLISH_QOS_EXACTLY_ONCE 0x4
29#define PUBLISH_QOS_AT_LEAST_ONCE 0x2
30#define PUBLISH_QOS_RETAIN 0x1
31
32#define PROTOCOL_NUMBER 4
33#define CONN_FLAG_BYTE_OFFSET 7
34
35#define CONNECT_FIXED_HEADER_SIZE 2
36#define CONNECT_VARIABLE_HEADER_SIZE 10
37#define SUBSCRIBE_FIXED_HEADER_FLAG 0x2
38#define UNSUBSCRIBE_FIXED_HEADER_FLAG 0x2
39
40#define MAX_SEND_SIZE 0xFFFFFF7F
41
42#define CODEC_STATE_VALUES \
43 CODEC_STATE_FIXED_HEADER, \
44 CODEC_STATE_VAR_HEADER, \
45 CODEC_STATE_PAYLOAD
46
47static const char* const TRUE_CONST = "true";
48static const char* const FALSE_CONST = "false";
49
50MU_DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES);
51
52typedef struct MQTTCODEC_INSTANCE_TAG
53{
54 CONTROL_PACKET_TYPE currPacket;
55 CODEC_STATE_RESULT codecState;
56 size_t bufferOffset;
57 int headerFlags;
58 BUFFER_HANDLE headerData;
59 ON_PACKET_COMPLETE_CALLBACK packetComplete;
60 void* callContext;
61 uint8_t storeRemainLen[4];
62 size_t remainLenIndex;
63} MQTTCODEC_INSTANCE;
64
65typedef struct PUBLISH_HEADER_INFO_TAG
66{
67 const char* topicName;
68 uint16_t packetId;
69 const char* msgBuffer;
70 QOS_VALUE qualityOfServiceValue;
71} PUBLISH_HEADER_INFO;
72
73static const char* retrieve_qos_value(QOS_VALUE value)
74{
75 switch (value)
76 {
77 case DELIVER_AT_MOST_ONCE:
78 return "DELIVER_AT_MOST_ONCE";
79 case DELIVER_AT_LEAST_ONCE:
80 return "DELIVER_AT_LEAST_ONCE";
81 case DELIVER_EXACTLY_ONCE:
82 default:
83 return "DELIVER_EXACTLY_ONCE";
84 }
85}
86
87static void byteutil_writeByte(uint8_t** buffer, uint8_t value)
88{
89 if (buffer != NULL)
90 {
91 **buffer = value;
92 (*buffer)++;
93 }
94}
95
96static void byteutil_writeInt(uint8_t** buffer, uint16_t value)
97{
98 if (buffer != NULL)
99 {
100 **buffer = (char)(value / 256);
101 (*buffer)++;
102 **buffer = (char)(value % 256);
103 (*buffer)++;
104 }
105}
106
107static void byteutil_writeUTF(uint8_t** buffer, const char* stringData, uint16_t len)
108{
109 if (buffer != NULL)
110 {
111 byteutil_writeInt(buffer, len);
112 (void)memcpy(*buffer, stringData, len);
113 *buffer += len;
114 }
115}
116
117static CONTROL_PACKET_TYPE processControlPacketType(uint8_t pktByte, int* flags)
118{
119 CONTROL_PACKET_TYPE result;
120 result = PACKET_TYPE_BYTE(pktByte);
121 if (flags != NULL)
122 {
123 *flags = FLAG_VALUE_BYTE(pktByte);
124 }
125 return result;
126}
127
128static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount, STRING_HANDLE trace_log)
129{
130 int result = 0;
131 if (payloadList == NULL || ctrlPacket == NULL)
132 {
133 result = MU_FAILURE;
134 }
135 else
136 {
137 size_t index = 0;
138 for (index = 0; index < payloadCount && result == 0; index++)
139 {
140 // Add the Payload
141 size_t offsetLen = BUFFER_length(ctrlPacket);
142 size_t topicLen = strlen(payloadList[index]);
143 if (topicLen > USHRT_MAX)
144 {
145 result = MU_FAILURE;
146 }
147 else if (BUFFER_enlarge(ctrlPacket, topicLen + 2) != 0)
148 {
149 result = MU_FAILURE;
150 }
151 else
152 {
153 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
154 iterator += offsetLen;
155 byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen);
156 }
157 if (trace_log != NULL)
158 {
159 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", payloadList[index]);
160 }
161 }
162 }
163 return result;
164}
165
166static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount, STRING_HANDLE trace_log)
167{
168 int result = 0;
169 if (payloadList == NULL || ctrlPacket == NULL)
170 {
171 result = MU_FAILURE;
172 }
173 else
174 {
175 size_t index = 0;
176 for (index = 0; index < payloadCount && result == 0; index++)
177 {
178 // Add the Payload
179 size_t offsetLen = BUFFER_length(ctrlPacket);
180 size_t topicLen = strlen(payloadList[index].subscribeTopic);
181 if (topicLen > USHRT_MAX)
182 {
183 result = MU_FAILURE;
184 }
185 else if (BUFFER_enlarge(ctrlPacket, topicLen + 2 + 1) != 0)
186 {
187 result = MU_FAILURE;
188 }
189 else
190 {
191 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
192 iterator += offsetLen;
193 byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen);
194 *iterator = payloadList[index].qosReturn;
195
196 if (trace_log != NULL)
197 {
198 STRING_sprintf(trace_log, " | TOPIC_NAME: %s | QOS: %d", payloadList[index].subscribeTopic, (int)payloadList[index].qosReturn);
199 }
200 }
201 }
202 }
203 return result;
204}
205
206static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
207{
208 int result = 0;
209 if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0)
210 {
211 result = MU_FAILURE;
212 }
213 else
214 {
215 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
216 if (iterator == NULL)
217 {
218 result = MU_FAILURE;
219 }
220 else
221 {
222 if (trace_log != NULL)
223 {
224 STRING_sprintf(trace_log, " | VER: %d | KEEPALIVE: %d | FLAGS:", PROTOCOL_NUMBER, mqttOptions->keepAliveInterval);
225 }
226 byteutil_writeUTF(&iterator, "MQTT", 4);
227 byteutil_writeByte(&iterator, PROTOCOL_NUMBER);
228 byteutil_writeByte(&iterator, 0); // Flags will be entered later
229 byteutil_writeInt(&iterator, mqttOptions->keepAliveInterval);
230 result = 0;
231 }
232 }
233 return result;
234}
235
236static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader, STRING_HANDLE trace_log)
237{
238 int result = 0;
239 size_t topicLen = 0;
240 size_t spaceLen = 0;
241 size_t idLen = 0;
242
243 size_t currLen = BUFFER_length(ctrlPacket);
244
245 topicLen = strlen(publishHeader->topicName);
246 spaceLen += 2;
247
248 if (publishHeader->qualityOfServiceValue != DELIVER_AT_MOST_ONCE)
249 {
250 // Packet Id is only set if the QOS is not 0
251 idLen = 2;
252 }
253
254 if (topicLen > USHRT_MAX)
255 {
256 result = MU_FAILURE;
257 }
258 else if (BUFFER_enlarge(ctrlPacket, topicLen + idLen + spaceLen) != 0)
259 {
260 result = MU_FAILURE;
261 }
262 else
263 {
264 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
265 if (iterator == NULL)
266 {
267 result = MU_FAILURE;
268 }
269 else
270 {
271 iterator += currLen;
272 /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/
273 byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen);
274 if (trace_log != NULL)
275 {
276 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", publishHeader->topicName);
277 }
278 if (idLen > 0)
279 {
280 if (trace_log != NULL)
281 {
282 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, publishHeader->packetId);
283 }
284 byteutil_writeInt(&iterator, publishHeader->packetId);
285 }
286 result = 0;
287 }
288 }
289 return result;
290}
291
292static int constructSubscibeTypeVariableHeader(BUFFER_HANDLE ctrlPacket, uint16_t packetId)
293{
294 int result = 0;
295 if (BUFFER_enlarge(ctrlPacket, 2) != 0)
296 {
297 result = MU_FAILURE;
298 }
299 else
300 {
301 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
302 if (iterator == NULL)
303 {
304 result = MU_FAILURE;
305 }
306 else
307 {
308 byteutil_writeInt(&iterator, packetId);
309 result = 0;
310 }
311 }
312 return result;
313}
314
315static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, uint8_t flags, uint16_t packetId)
316{
317 BUFFER_HANDLE result = BUFFER_new();
318 if (result != NULL)
319 {
320 if (BUFFER_pre_build(result, 4) != 0)
321 {
322 BUFFER_delete(result);
323 result = NULL;
324 }
325 else
326 {
327 uint8_t* iterator = BUFFER_u_char(result);
328 if (iterator == NULL)
329 {
330 BUFFER_delete(result);
331 result = NULL;
332 }
333 else
334 {
335 *iterator = (uint8_t)type | flags;
336 iterator++;
337 *iterator = 0x2;
338 iterator++;
339 byteutil_writeInt(&iterator, packetId);
340 }
341 }
342 }
343 return result;
344}
345
346static int constructFixedHeader(BUFFER_HANDLE ctrlPacket, CONTROL_PACKET_TYPE packetType, uint8_t flags)
347{
348 int result;
349 if (ctrlPacket == NULL)
350 {
351 return MU_FAILURE;
352 }
353 else
354 {
355 size_t packetLen = BUFFER_length(ctrlPacket);
356 uint8_t remainSize[4] ={ 0 };
357 size_t index = 0;
358
359 // Calculate the length of packet
360 do
361 {
362 uint8_t encode = packetLen % 128;
363 packetLen /= 128;
364 // if there are more data to encode, set the top bit of this byte
365 if (packetLen > 0)
366 {
367 encode |= NEXT_128_CHUNK;
368 }
369 remainSize[index++] = encode;
370 } while (packetLen > 0);
371
372 BUFFER_HANDLE fixedHeader = BUFFER_new();
373 if (fixedHeader == NULL)
374 {
375 result = MU_FAILURE;
376 }
377 else if (BUFFER_pre_build(fixedHeader, index + 1) != 0)
378 {
379 BUFFER_delete(fixedHeader);
380 result = MU_FAILURE;
381 }
382 else
383 {
384 uint8_t* iterator = BUFFER_u_char(fixedHeader);
385 *iterator = (uint8_t)packetType | flags;
386 iterator++;
387 (void)memcpy(iterator, remainSize, index);
388
389 result = BUFFER_prepend(ctrlPacket, fixedHeader);
390 BUFFER_delete(fixedHeader);
391 }
392 }
393 return result;
394}
395
396static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
397{
398 int result = 0;
399 if (mqttOptions == NULL || ctrlPacket == NULL)
400 {
401 result = MU_FAILURE;
402 }
403 else
404 {
405 size_t clientLen = 0;
406 size_t usernameLen = 0;
407 size_t passwordLen = 0;
408 size_t willMessageLen = 0;
409 size_t willTopicLen = 0;
410 size_t spaceLen = 0;
411
412 if (mqttOptions->clientId != NULL)
413 {
414 spaceLen += 2;
415 clientLen = strlen(mqttOptions->clientId);
416 }
417 if (mqttOptions->username != NULL)
418 {
419 spaceLen += 2;
420 usernameLen = strlen(mqttOptions->username);
421 }
422 if (mqttOptions->password != NULL)
423 {
424 spaceLen += 2;
425 passwordLen = strlen(mqttOptions->password);
426 }
427 if (mqttOptions->willMessage != NULL)
428 {
429 spaceLen += 2;
430 willMessageLen = strlen(mqttOptions->willMessage);
431 }
432 if (mqttOptions->willTopic != NULL)
433 {
434 spaceLen += 2;
435 willTopicLen = strlen(mqttOptions->willTopic);
436 }
437
438 size_t currLen = BUFFER_length(ctrlPacket);
439 size_t totalLen = clientLen + usernameLen + passwordLen + willMessageLen + willTopicLen + spaceLen;
440
441 // Validate the Username & Password
442 if (clientLen > USHRT_MAX)
443 {
444 result = MU_FAILURE;
445 }
446 else if (usernameLen == 0 && passwordLen > 0)
447 {
448 result = MU_FAILURE;
449 }
450 else if ((willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0))
451 {
452 result = MU_FAILURE;
453 }
454 else if (BUFFER_enlarge(ctrlPacket, totalLen) != 0)
455 {
456 result = MU_FAILURE;
457 }
458 else
459 {
460 uint8_t* packet = BUFFER_u_char(ctrlPacket);
461 uint8_t* iterator = packet;
462
463 iterator += currLen;
464 byteutil_writeUTF(&iterator, mqttOptions->clientId, (uint16_t)clientLen);
465
466 // TODO: Read on the Will Topic
467 if (willMessageLen > USHRT_MAX || willTopicLen > USHRT_MAX || usernameLen > USHRT_MAX || passwordLen > USHRT_MAX)
468 {
469 result = MU_FAILURE;
470 }
471 else
472 {
473 STRING_HANDLE connect_payload_trace = NULL;
474 if (trace_log != NULL)
475 {
476 connect_payload_trace = STRING_new();
477 }
478 if (willMessageLen > 0 && willTopicLen > 0)
479 {
480 if (trace_log != NULL)
481 {
482 (void)STRING_sprintf(connect_payload_trace, " | WILL_TOPIC: %s", mqttOptions->willTopic);
483 }
484 packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG;
485 byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen);
486 packet[CONN_FLAG_BYTE_OFFSET] |= (mqttOptions->qualityOfServiceValue << 3);
487 if (mqttOptions->messageRetain)
488 {
489 packet[CONN_FLAG_BYTE_OFFSET] |= WILL_RETAIN_FLAG;
490 }
491 byteutil_writeUTF(&iterator, mqttOptions->willMessage, (uint16_t)willMessageLen);
492 }
493 if (usernameLen > 0)
494 {
495 packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
496 byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
497 if (trace_log != NULL)
498 {
499 (void)STRING_sprintf(connect_payload_trace, " | USERNAME: %s", mqttOptions->username);
500 }
501 }
502 if (passwordLen > 0)
503 {
504 packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
505 byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
506 if (trace_log != NULL)
507 {
508 (void)STRING_sprintf(connect_payload_trace, " | PWD: XXXX");
509 }
510 }
511 // TODO: Get the rest of the flags
512 if (trace_log != NULL)
513 {
514 (void)STRING_sprintf(connect_payload_trace, " | CLEAN: %s", mqttOptions->useCleanSession ? "1" : "0");
515 }
516 if (mqttOptions->useCleanSession)
517 {
518 packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG;
519 }
520 if (trace_log != NULL)
521 {
522 (void)STRING_sprintf(trace_log, " %lu", packet[CONN_FLAG_BYTE_OFFSET]);
523 (void)STRING_concat_with_STRING(trace_log, connect_payload_trace);
524 STRING_delete(connect_payload_trace);
525 }
526 result = 0;
527 }
528 }
529 }
530 return result;
531}
532
533static int prepareheaderDataInfo(MQTTCODEC_INSTANCE* codecData, uint8_t remainLen)
534{
535 int result;
536 if (codecData == NULL)
537 {
538 result = MU_FAILURE;
539 }
540 else
541 {
542 result = 0;
543 codecData->storeRemainLen[codecData->remainLenIndex++] = remainLen;
544 if (remainLen <= 0x7f)
545 {
546 int multiplier = 1;
547 int totalLen = 0;
548 size_t index = 0;
549 uint8_t encodeByte = 0;
550 do
551 {
552 encodeByte = codecData->storeRemainLen[index++];
553 totalLen += (encodeByte & 127) * multiplier;
554 multiplier *= NEXT_128_CHUNK;
555
556 if (multiplier > 128 * 128 * 128)
557 {
558 result = MU_FAILURE;
559 break;
560 }
561 } while ((encodeByte & NEXT_128_CHUNK) != 0);
562
563 codecData->codecState = CODEC_STATE_VAR_HEADER;
564
565 // Reset remainLen Index
566 codecData->remainLenIndex = 0;
567 memset(codecData->storeRemainLen, 0, 4 * sizeof(uint8_t));
568
569 if (totalLen > 0)
570 {
571 codecData->bufferOffset = 0;
572 codecData->headerData = BUFFER_new();
573 if (codecData->headerData == NULL)
574 {
575 /* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
576 LogError("Failed BUFFER_new");
577 result = MU_FAILURE;
578 }
579 else
580 {
581 if (BUFFER_pre_build(codecData->headerData, totalLen) != 0)
582 {
583 /* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
584 LogError("Failed BUFFER_pre_build");
585 result = MU_FAILURE;
586 }
587
588 }
589 }
590 }
591 }
592 return result;
593}
594
595static void completePacketData(MQTTCODEC_INSTANCE* codecData)
596{
597 if (codecData)
598 {
599 if (codecData->packetComplete != NULL)
600 {
601 codecData->packetComplete(codecData->callContext, codecData->currPacket, codecData->headerFlags, codecData->headerData);
602 }
603
604 // Clean up data
605 codecData->currPacket = UNKNOWN_TYPE;
606 codecData->codecState = CODEC_STATE_FIXED_HEADER;
607 codecData->headerFlags = 0;
608 BUFFER_delete(codecData->headerData);
609 codecData->headerData = NULL;
610 }
611}
612
613MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx)
614{
615 MQTTCODEC_HANDLE result;
616 result = malloc(sizeof(MQTTCODEC_INSTANCE));
617 /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */
618 if (result != NULL)
619 {
620 /* Codes_SRS_MQTT_CODEC_07_002: [On success mqtt_codec_create shall return a MQTTCODEC_HANDLE value.] */
621 result->currPacket = UNKNOWN_TYPE;
622 result->codecState = CODEC_STATE_FIXED_HEADER;
623 result->headerFlags = 0;
624 result->bufferOffset = 0;
625 result->packetComplete = packetComplete;
626 result->callContext = callbackCtx;
627 result->headerData = NULL;
628 memset(result->storeRemainLen, 0, 4 * sizeof(uint8_t));
629 result->remainLenIndex = 0;
630 }
631 return result;
632}
633
634void mqtt_codec_destroy(MQTTCODEC_HANDLE handle)
635{
636 /* Codes_SRS_MQTT_CODEC_07_003: [If the handle parameter is NULL then mqtt_codec_destroy shall do nothing.] */
637 if (handle != NULL)
638 {
639 MQTTCODEC_INSTANCE* codecData = (MQTTCODEC_INSTANCE*)handle;
640 /* Codes_SRS_MQTT_CODEC_07_004: [mqtt_codec_destroy shall deallocate all memory that has been allocated by this object.] */
641 BUFFER_delete(codecData->headerData);
642 free(codecData);
643 }
644}
645
646BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
647{
648 BUFFER_HANDLE result;
649 /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */
650 if (mqttOptions == NULL)
651 {
652 result = NULL;
653 }
654 else
655 {
656 /* Codes_SRS_MQTT_CODEC_07_009: [mqtt_codec_connect shall construct a BUFFER_HANDLE that represents a MQTT CONNECT packet.] */
657 result = BUFFER_new();
658 if (result != NULL)
659 {
660 STRING_HANDLE varible_header_log = NULL;
661 if (trace_log != NULL)
662 {
663 varible_header_log = STRING_new();
664 }
665 // Add Variable Header Information
666 if (constructConnectVariableHeader(result, mqttOptions, varible_header_log) != 0)
667 {
668 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
669 BUFFER_delete(result);
670 result = NULL;
671 }
672 else
673 {
674 if (constructConnPayload(result, mqttOptions, varible_header_log) != 0)
675 {
676 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
677 BUFFER_delete(result);
678 result = NULL;
679 }
680 else
681 {
682 if (trace_log != NULL)
683 {
684 (void)STRING_copy(trace_log, "CONNECT");
685 }
686 if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0)
687 {
688 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
689 BUFFER_delete(result);
690 result = NULL;
691 }
692 else
693 {
694 if (trace_log != NULL)
695 {
696 (void)STRING_concat_with_STRING(trace_log, varible_header_log);
697 }
698 }
699 }
700 if (varible_header_log != NULL)
701 {
702 STRING_delete(varible_header_log);
703 }
704 }
705 }
706 }
707 return result;
708}
709
710BUFFER_HANDLE mqtt_codec_disconnect()
711{
712 /* Codes_SRS_MQTT_CODEC_07_011: [On success mqtt_codec_disconnect shall construct a BUFFER_HANDLE that represents a MQTT DISCONNECT packet.] */
713 BUFFER_HANDLE result = BUFFER_new();
714 if (result != NULL)
715 {
716 if (BUFFER_enlarge(result, 2) != 0)
717 {
718 /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
719 BUFFER_delete(result);
720 result = NULL;
721 }
722 else
723 {
724 uint8_t* iterator = BUFFER_u_char(result);
725 if (iterator == NULL)
726 {
727 /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
728 BUFFER_delete(result);
729 result = NULL;
730 }
731 else
732 {
733 iterator[0] = DISCONNECT_TYPE;
734 iterator[1] = 0;
735 }
736 }
737 }
738 return result;
739}
740
741BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen, STRING_HANDLE trace_log)
742{
743 BUFFER_HANDLE result;
744 /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */
745 if (topicName == NULL)
746 {
747 result = NULL;
748 }
749 /* Codes_SRS_MQTT_CODEC_07_036: [mqtt_codec_publish shall return NULL if the buffLen variable is greater than the MAX_SEND_SIZE (0xFFFFFF7F).] */
750 else if (buffLen > MAX_SEND_SIZE)
751 {
752 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
753 result = NULL;
754 }
755 else
756 {
757 PUBLISH_HEADER_INFO publishInfo ={ 0 };
758 publishInfo.topicName = topicName;
759 publishInfo.packetId = packetId;
760 publishInfo.qualityOfServiceValue = qosValue;
761
762 uint8_t headerFlags = 0;
763 if (duplicateMsg) headerFlags |= PUBLISH_DUP_FLAG;
764 if (serverRetain) headerFlags |= PUBLISH_QOS_RETAIN;
765 if (qosValue != DELIVER_AT_MOST_ONCE)
766 {
767 if (qosValue == DELIVER_AT_LEAST_ONCE)
768 {
769 headerFlags |= PUBLISH_QOS_AT_LEAST_ONCE;
770 }
771 else
772 {
773 headerFlags |= PUBLISH_QOS_EXACTLY_ONCE;
774 }
775 }
776
777 /* Codes_SRS_MQTT_CODEC_07_007: [mqtt_codec_publish shall return a BUFFER_HANDLE that represents a MQTT PUBLISH message.] */
778 result = BUFFER_new();
779 if (result != NULL)
780 {
781 STRING_HANDLE varible_header_log = NULL;
782 if (trace_log != NULL)
783 {
784 varible_header_log = STRING_construct_sprintf(" | IS_DUP: %s | RETAIN: %d | QOS: %s", duplicateMsg ? TRUE_CONST : FALSE_CONST,
785 serverRetain ? 1 : 0,
786 retrieve_qos_value(publishInfo.qualityOfServiceValue) );
787 }
788
789 if (constructPublishVariableHeader(result, &publishInfo, varible_header_log) != 0)
790 {
791 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
792 BUFFER_delete(result);
793 result = NULL;
794 }
795 else
796 {
797 size_t payloadOffset = BUFFER_length(result);
798 if (buffLen > 0)
799 {
800 if (BUFFER_enlarge(result, buffLen) != 0)
801 {
802 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
803 BUFFER_delete(result);
804 result = NULL;
805 }
806 else
807 {
808 uint8_t* iterator = BUFFER_u_char(result);
809 if (iterator == NULL)
810 {
811 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
812 BUFFER_delete(result);
813 result = NULL;
814 }
815 else
816 {
817 iterator += payloadOffset;
818 // Write Message
819 (void)memcpy(iterator, msgBuffer, buffLen);
820 if (trace_log)
821 {
822 STRING_sprintf(varible_header_log, " | PAYLOAD_LEN: %lu", (unsigned long)buffLen);
823 }
824 }
825 }
826 }
827
828 if (result != NULL)
829 {
830 if (trace_log != NULL)
831 {
832 (void)STRING_copy(trace_log, "PUBLISH");
833 }
834 if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0)
835 {
836 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
837 BUFFER_delete(result);
838 result = NULL;
839 }
840 else
841 {
842 if (trace_log != NULL)
843 {
844 (void)STRING_concat_with_STRING(trace_log, varible_header_log);
845 }
846 }
847 }
848 }
849 if (varible_header_log != NULL)
850 {
851 STRING_delete(varible_header_log);
852 }
853 }
854 }
855 return result;
856}
857
858BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId)
859{
860 /* Codes_SRS_MQTT_CODEC_07_013: [On success mqtt_codec_publishAck shall return a BUFFER_HANDLE representation of a MQTT PUBACK packet.] */
861 /* Codes_SRS_MQTT_CODEC_07_014 : [If any error is encountered then mqtt_codec_publishAck shall return NULL.] */
862 BUFFER_HANDLE result = constructPublishReply(PUBACK_TYPE, 0, packetId);
863 return result;
864}
865
866BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId)
867{
868 /* Codes_SRS_MQTT_CODEC_07_015: [On success mqtt_codec_publishRecieved shall return a BUFFER_HANDLE representation of a MQTT PUBREC packet.] */
869 /* Codes_SRS_MQTT_CODEC_07_016 : [If any error is encountered then mqtt_codec_publishRecieved shall return NULL.] */
870 BUFFER_HANDLE result = constructPublishReply(PUBREC_TYPE, 0, packetId);
871 return result;
872}
873
874BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId)
875{
876 /* Codes_SRS_MQTT_CODEC_07_017: [On success mqtt_codec_publishRelease shall return a BUFFER_HANDLE representation of a MQTT PUBREL packet.] */
877 /* Codes_SRS_MQTT_CODEC_07_018 : [If any error is encountered then mqtt_codec_publishRelease shall return NULL.] */
878 BUFFER_HANDLE result = constructPublishReply(PUBREL_TYPE, 2, packetId);
879 return result;
880}
881
882BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId)
883{
884 /* Codes_SRS_MQTT_CODEC_07_019: [On success mqtt_codec_publishComplete shall return a BUFFER_HANDLE representation of a MQTT PUBCOMP packet.] */
885 /* Codes_SRS_MQTT_CODEC_07_020 : [If any error is encountered then mqtt_codec_publishComplete shall return NULL.] */
886 BUFFER_HANDLE result = constructPublishReply(PUBCOMP_TYPE, 0, packetId);
887 return result;
888}
889
890BUFFER_HANDLE mqtt_codec_ping()
891{
892 /* Codes_SRS_MQTT_CODEC_07_021: [On success mqtt_codec_ping shall construct a BUFFER_HANDLE that represents a MQTT PINGREQ packet.] */
893 BUFFER_HANDLE result = BUFFER_new();
894 if (result != NULL)
895 {
896 if (BUFFER_enlarge(result, 2) != 0)
897 {
898 /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
899 BUFFER_delete(result);
900 result = NULL;
901 }
902 else
903 {
904 uint8_t* iterator = BUFFER_u_char(result);
905 if (iterator == NULL)
906 {
907 /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
908 BUFFER_delete(result);
909 result = NULL;
910 }
911 else
912 {
913 iterator[0] = PINGREQ_TYPE;
914 iterator[1] = 0;
915 }
916 }
917 }
918 return result;
919}
920
921BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count, STRING_HANDLE trace_log)
922{
923 BUFFER_HANDLE result;
924 /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */
925 if (subscribeList == NULL || count == 0)
926 {
927 result = NULL;
928 }
929 else
930 {
931 /* Codes_SRS_MQTT_CODEC_07_026: [mqtt_codec_subscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.]*/
932 result = BUFFER_new();
933 if (result != NULL)
934 {
935 if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
936 {
937 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
938 BUFFER_delete(result);
939 result = NULL;
940 }
941 else
942 {
943 STRING_HANDLE sub_trace = NULL;
944 if (trace_log != NULL)
945 {
946 sub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
947 }
948 /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */
949 if (addListItemsToSubscribePacket(result, subscribeList, count, sub_trace) != 0)
950 {
951 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
952 BUFFER_delete(result);
953 result = NULL;
954 }
955 else
956 {
957
958 if (trace_log != NULL)
959 {
960 STRING_concat(trace_log, "SUBSCRIBE");
961 }
962 if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0)
963 {
964 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
965 BUFFER_delete(result);
966 result = NULL;
967 }
968 else
969 {
970 if (trace_log != NULL)
971 {
972 (void)STRING_concat_with_STRING(trace_log, sub_trace);
973 }
974 }
975 }
976 if (sub_trace != NULL)
977 {
978 STRING_delete(sub_trace);
979 }
980 }
981 }
982 }
983 return result;
984}
985
986BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count, STRING_HANDLE trace_log)
987{
988 BUFFER_HANDLE result;
989 /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */
990 if (unsubscribeList == NULL || count == 0)
991 {
992 result = NULL;
993 }
994 else
995 {
996 /* Codes_SRS_MQTT_CODEC_07_030: [mqtt_codec_unsubscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.] */
997 result = BUFFER_new();
998 if (result != NULL)
999 {
1000 if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
1001 {
1002 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1003 BUFFER_delete(result);
1004 result = NULL;
1005 }
1006 else
1007 {
1008 STRING_HANDLE unsub_trace = NULL;
1009 if (trace_log != NULL)
1010 {
1011 unsub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
1012 }
1013 /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */
1014 if (addListItemsToUnsubscribePacket(result, unsubscribeList, count, unsub_trace) != 0)
1015 {
1016 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1017 BUFFER_delete(result);
1018 result = NULL;
1019 }
1020 else
1021 {
1022 if (trace_log != NULL)
1023 {
1024 (void)STRING_copy(trace_log, "UNSUBSCRIBE");
1025 }
1026 if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0)
1027 {
1028 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1029 BUFFER_delete(result);
1030 result = NULL;
1031 }
1032 else
1033 {
1034 if (trace_log != NULL)
1035 {
1036 (void)STRING_concat_with_STRING(trace_log, unsub_trace);
1037 }
1038 }
1039 }
1040 if (unsub_trace != NULL)
1041 {
1042 STRING_delete(unsub_trace);
1043 }
1044 }
1045 }
1046 }
1047 return result;
1048}
1049
1050int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size)
1051{
1052 int result;
1053 MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle;
1054 /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
1055 if (codec_Data == NULL)
1056 {
1057 result = MU_FAILURE;
1058 }
1059 /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
1060 /* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */
1061 else if (buffer == NULL || size == 0)
1062 {
1063 codec_Data->currPacket = PACKET_TYPE_ERROR;
1064 result = MU_FAILURE;
1065 }
1066 else
1067 {
1068 /* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */
1069 result = 0;
1070 size_t index = 0;
1071 for (index = 0; index < size && result == 0; index++)
1072 {
1073 uint8_t iterator = ((int8_t*)buffer)[index];
1074 if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER)
1075 {
1076 if (codec_Data->currPacket == UNKNOWN_TYPE)
1077 {
1078 codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags);
1079 }
1080 else
1081 {
1082 if (prepareheaderDataInfo(codec_Data, iterator) != 0)
1083 {
1084 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1085 codec_Data->currPacket = PACKET_TYPE_ERROR;
1086 result = MU_FAILURE;
1087 }
1088 if (codec_Data->currPacket == PINGRESP_TYPE)
1089 {
1090 /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
1091 completePacketData(codec_Data);
1092 }
1093 }
1094 }
1095 else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
1096 {
1097 if (codec_Data->headerData == NULL)
1098 {
1099 codec_Data->codecState = CODEC_STATE_PAYLOAD;
1100 }
1101 else
1102 {
1103 uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData);
1104 if (dataBytes == NULL)
1105 {
1106 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1107 codec_Data->currPacket = PACKET_TYPE_ERROR;
1108 result = MU_FAILURE;
1109 }
1110 else
1111 {
1112 // Increment the data
1113 dataBytes += codec_Data->bufferOffset++;
1114 *dataBytes = iterator;
1115
1116 size_t totalLen = BUFFER_length(codec_Data->headerData);
1117 if (codec_Data->bufferOffset >= totalLen)
1118 {
1119 /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
1120 completePacketData(codec_Data);
1121 }
1122 }
1123 }
1124 }
1125 else
1126 {
1127 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1128 codec_Data->currPacket = PACKET_TYPE_ERROR;
1129 result = MU_FAILURE;
1130 }
1131 }
1132 }
1133 return result;
1134}
Note: See TracBrowser for help on using the repository browser.