source: azure_iot_hub/trunk/azure_iothub/umqtt/src/mqtt_codec.c@ 399

Last change on this file since 399 was 399, checked in by coas-nagasima, 5 years ago

更新

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 40.7 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <limits.h>
6#include "azure_c_shared_utility/optimize_size.h"
7#include "azure_c_shared_utility/gballoc.h"
8#include "azure_c_shared_utility/buffer_.h"
9#include "azure_c_shared_utility/strings.h"
10#include "azure_c_shared_utility/macro_utils.h"
11#include "azure_c_shared_utility/xlogging.h"
12#include "azure_umqtt_c/mqtt_codec.h"
13#include <inttypes.h>
14
15#define PAYLOAD_OFFSET 5
16#define PACKET_TYPE_BYTE(p) (CONTROL_PACKET_TYPE)((uint8_t)(((uint8_t)(p)) & 0xf0))
17#define FLAG_VALUE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf))
18
19#define USERNAME_FLAG 0x80
20#define PASSWORD_FLAG 0x40
21#define WILL_RETAIN_FLAG 0x20
22#define WILL_QOS_FLAG_ 0x18
23#define WILL_FLAG_FLAG 0x04
24#define CLEAN_SESSION_FLAG 0x02
25
26#define NEXT_128_CHUNK 0x80
27#define PUBLISH_DUP_FLAG 0x8
28#define PUBLISH_QOS_EXACTLY_ONCE 0x4
29#define PUBLISH_QOS_AT_LEAST_ONCE 0x2
30#define PUBLISH_QOS_RETAIN 0x1
31
32#define PROTOCOL_NUMBER 4
33#define CONN_FLAG_BYTE_OFFSET 7
34
35#define CONNECT_FIXED_HEADER_SIZE 2
36#define CONNECT_VARIABLE_HEADER_SIZE 10
37#define SUBSCRIBE_FIXED_HEADER_FLAG 0x2
38#define UNSUBSCRIBE_FIXED_HEADER_FLAG 0x2
39
40#define MAX_SEND_SIZE 0xFFFFFF7F // 268435455
41
42// This captures the maximum packet size for 3 digits.
43// If it's above this value then we bail out of the loop
44#define MAX_3_DIGIT_PACKET_SIZE 2097152
45
46#define CODEC_STATE_VALUES \
47 CODEC_STATE_FIXED_HEADER, \
48 CODEC_STATE_VAR_HEADER, \
49 CODEC_STATE_PAYLOAD
50
51static const char* const TRUE_CONST = "true";
52static const char* const FALSE_CONST = "false";
53
54MU_DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES);
55
56typedef struct MQTTCODEC_INSTANCE_TAG
57{
58 CONTROL_PACKET_TYPE currPacket;
59 CODEC_STATE_RESULT codecState;
60 size_t bufferOffset;
61 int headerFlags;
62 BUFFER_HANDLE headerData;
63 ON_PACKET_COMPLETE_CALLBACK packetComplete;
64 void* callContext;
65 uint8_t storeRemainLen[4];
66 size_t remainLenIndex;
67} MQTTCODEC_INSTANCE;
68
69typedef struct PUBLISH_HEADER_INFO_TAG
70{
71 const char* topicName;
72 uint16_t packetId;
73 const char* msgBuffer;
74 QOS_VALUE qualityOfServiceValue;
75} PUBLISH_HEADER_INFO;
76
77static const char* retrieve_qos_value(QOS_VALUE value)
78{
79 switch (value)
80 {
81 case DELIVER_AT_MOST_ONCE:
82 return "DELIVER_AT_MOST_ONCE";
83 case DELIVER_AT_LEAST_ONCE:
84 return "DELIVER_AT_LEAST_ONCE";
85 case DELIVER_EXACTLY_ONCE:
86 default:
87 return "DELIVER_EXACTLY_ONCE";
88 }
89}
90
91static void byteutil_writeByte(uint8_t** buffer, uint8_t value)
92{
93 if (buffer != NULL)
94 {
95 **buffer = value;
96 (*buffer)++;
97 }
98}
99
100static void byteutil_writeInt(uint8_t** buffer, uint16_t value)
101{
102 if (buffer != NULL)
103 {
104 **buffer = (char)(value / 256);
105 (*buffer)++;
106 **buffer = (char)(value % 256);
107 (*buffer)++;
108 }
109}
110
111static void byteutil_writeUTF(uint8_t** buffer, const char* stringData, uint16_t len)
112{
113 if (buffer != NULL)
114 {
115 byteutil_writeInt(buffer, len);
116 (void)memcpy(*buffer, stringData, len);
117 *buffer += len;
118 }
119}
120
121static CONTROL_PACKET_TYPE processControlPacketType(uint8_t pktByte, int* flags)
122{
123 CONTROL_PACKET_TYPE result;
124 result = PACKET_TYPE_BYTE(pktByte);
125 if (flags != NULL)
126 {
127 *flags = FLAG_VALUE_BYTE(pktByte);
128 }
129 return result;
130}
131
132static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount, STRING_HANDLE trace_log)
133{
134 int result = 0;
135 if (payloadList == NULL || ctrlPacket == NULL)
136 {
137 result = MU_FAILURE;
138 }
139 else
140 {
141 size_t index = 0;
142 for (index = 0; index < payloadCount && result == 0; index++)
143 {
144 // Add the Payload
145 size_t offsetLen = BUFFER_length(ctrlPacket);
146 size_t topicLen = strlen(payloadList[index]);
147 if (topicLen > USHRT_MAX)
148 {
149 result = MU_FAILURE;
150 }
151 else if (BUFFER_enlarge(ctrlPacket, topicLen + 2) != 0)
152 {
153 result = MU_FAILURE;
154 }
155 else
156 {
157 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
158 iterator += offsetLen;
159 byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen);
160 }
161 if (trace_log != NULL)
162 {
163 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", payloadList[index]);
164 }
165 }
166 }
167 return result;
168}
169
170static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount, STRING_HANDLE trace_log)
171{
172 int result = 0;
173 if (payloadList == NULL || ctrlPacket == NULL)
174 {
175 result = MU_FAILURE;
176 }
177 else
178 {
179 size_t index = 0;
180 for (index = 0; index < payloadCount && result == 0; index++)
181 {
182 // Add the Payload
183 size_t offsetLen = BUFFER_length(ctrlPacket);
184 size_t topicLen = strlen(payloadList[index].subscribeTopic);
185 if (topicLen > USHRT_MAX)
186 {
187 result = MU_FAILURE;
188 }
189 else if (BUFFER_enlarge(ctrlPacket, topicLen + 2 + 1) != 0)
190 {
191 result = MU_FAILURE;
192 }
193 else
194 {
195 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
196 iterator += offsetLen;
197 byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen);
198 *iterator = payloadList[index].qosReturn;
199
200 if (trace_log != NULL)
201 {
202 STRING_sprintf(trace_log, " | TOPIC_NAME: %s | QOS: %d", payloadList[index].subscribeTopic, (int)payloadList[index].qosReturn);
203 }
204 }
205 }
206 }
207 return result;
208}
209
210static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
211{
212 int result = 0;
213 if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0)
214 {
215 result = MU_FAILURE;
216 }
217 else
218 {
219 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
220 if (iterator == NULL)
221 {
222 result = MU_FAILURE;
223 }
224 else
225 {
226 if (trace_log != NULL)
227 {
228 STRING_sprintf(trace_log, " | VER: %d | KEEPALIVE: %d | FLAGS:", PROTOCOL_NUMBER, mqttOptions->keepAliveInterval);
229 }
230 byteutil_writeUTF(&iterator, "MQTT", 4);
231 byteutil_writeByte(&iterator, PROTOCOL_NUMBER);
232 byteutil_writeByte(&iterator, 0); // Flags will be entered later
233 byteutil_writeInt(&iterator, mqttOptions->keepAliveInterval);
234 result = 0;
235 }
236 }
237 return result;
238}
239
240static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader, STRING_HANDLE trace_log)
241{
242 int result = 0;
243 size_t topicLen = 0;
244 size_t spaceLen = 0;
245 size_t idLen = 0;
246
247 size_t currLen = BUFFER_length(ctrlPacket);
248
249 topicLen = strlen(publishHeader->topicName);
250 spaceLen += 2;
251
252 if (publishHeader->qualityOfServiceValue != DELIVER_AT_MOST_ONCE)
253 {
254 // Packet Id is only set if the QOS is not 0
255 idLen = 2;
256 }
257
258 if (topicLen > USHRT_MAX)
259 {
260 result = MU_FAILURE;
261 }
262 else if (BUFFER_enlarge(ctrlPacket, topicLen + idLen + spaceLen) != 0)
263 {
264 result = MU_FAILURE;
265 }
266 else
267 {
268 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
269 if (iterator == NULL)
270 {
271 result = MU_FAILURE;
272 }
273 else
274 {
275 iterator += currLen;
276 /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/
277 byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen);
278 if (trace_log != NULL)
279 {
280 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", publishHeader->topicName);
281 }
282 if (idLen > 0)
283 {
284 if (trace_log != NULL)
285 {
286 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, publishHeader->packetId);
287 }
288 byteutil_writeInt(&iterator, publishHeader->packetId);
289 }
290 result = 0;
291 }
292 }
293 return result;
294}
295
296static int constructSubscibeTypeVariableHeader(BUFFER_HANDLE ctrlPacket, uint16_t packetId)
297{
298 int result = 0;
299 if (BUFFER_enlarge(ctrlPacket, 2) != 0)
300 {
301 result = MU_FAILURE;
302 }
303 else
304 {
305 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
306 if (iterator == NULL)
307 {
308 result = MU_FAILURE;
309 }
310 else
311 {
312 byteutil_writeInt(&iterator, packetId);
313 result = 0;
314 }
315 }
316 return result;
317}
318
319static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, uint8_t flags, uint16_t packetId)
320{
321 BUFFER_HANDLE result = BUFFER_new();
322 if (result != NULL)
323 {
324 if (BUFFER_pre_build(result, 4) != 0)
325 {
326 BUFFER_delete(result);
327 result = NULL;
328 }
329 else
330 {
331 uint8_t* iterator = BUFFER_u_char(result);
332 if (iterator == NULL)
333 {
334 BUFFER_delete(result);
335 result = NULL;
336 }
337 else
338 {
339 *iterator = (uint8_t)type | flags;
340 iterator++;
341 *iterator = 0x2;
342 iterator++;
343 byteutil_writeInt(&iterator, packetId);
344 }
345 }
346 }
347 return result;
348}
349
350static int constructFixedHeader(BUFFER_HANDLE ctrlPacket, CONTROL_PACKET_TYPE packetType, uint8_t flags)
351{
352 int result;
353 if (ctrlPacket == NULL)
354 {
355 return MU_FAILURE;
356 }
357 else
358 {
359 size_t packetLen = BUFFER_length(ctrlPacket);
360 uint8_t remainSize[4] ={ 0 };
361 size_t index = 0;
362
363 // Calculate the length of packet
364 do
365 {
366 uint8_t encode = packetLen % 128;
367 packetLen /= 128;
368 // if there are more data to encode, set the top bit of this byte
369 if (packetLen > 0)
370 {
371 encode |= NEXT_128_CHUNK;
372 }
373 remainSize[index++] = encode;
374 } while (packetLen > 0);
375
376 BUFFER_HANDLE fixedHeader = BUFFER_new();
377 if (fixedHeader == NULL)
378 {
379 result = MU_FAILURE;
380 }
381 else if (BUFFER_pre_build(fixedHeader, index + 1) != 0)
382 {
383 BUFFER_delete(fixedHeader);
384 result = MU_FAILURE;
385 }
386 else
387 {
388 uint8_t* iterator = BUFFER_u_char(fixedHeader);
389 *iterator = (uint8_t)packetType | flags;
390 iterator++;
391 (void)memcpy(iterator, remainSize, index);
392
393 result = BUFFER_prepend(ctrlPacket, fixedHeader);
394 BUFFER_delete(fixedHeader);
395 }
396 }
397 return result;
398}
399
400static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
401{
402 int result = 0;
403 if (mqttOptions == NULL || ctrlPacket == NULL)
404 {
405 result = MU_FAILURE;
406 }
407 else
408 {
409 size_t clientLen = 0;
410 size_t usernameLen = 0;
411 size_t passwordLen = 0;
412 size_t willMessageLen = 0;
413 size_t willTopicLen = 0;
414 size_t spaceLen = 0;
415 size_t currLen = 0;
416 size_t totalLen = 0;
417
418 if (mqttOptions->clientId != NULL)
419 {
420 spaceLen += 2;
421 clientLen = strlen(mqttOptions->clientId);
422 }
423 if (mqttOptions->username != NULL)
424 {
425 spaceLen += 2;
426 usernameLen = strlen(mqttOptions->username);
427 }
428 if (mqttOptions->password != NULL)
429 {
430 spaceLen += 2;
431 passwordLen = strlen(mqttOptions->password);
432 }
433 if (mqttOptions->willMessage != NULL)
434 {
435 spaceLen += 2;
436 willMessageLen = strlen(mqttOptions->willMessage);
437 }
438 if (mqttOptions->willTopic != NULL)
439 {
440 spaceLen += 2;
441 willTopicLen = strlen(mqttOptions->willTopic);
442 }
443
444 currLen = BUFFER_length(ctrlPacket);
445 totalLen = clientLen + usernameLen + passwordLen + willMessageLen + willTopicLen + spaceLen;
446
447 // Validate the Username & Password
448 if (clientLen > USHRT_MAX)
449 {
450 result = MU_FAILURE;
451 }
452 else if (usernameLen == 0 && passwordLen > 0)
453 {
454 result = MU_FAILURE;
455 }
456 else if ((willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0))
457 {
458 result = MU_FAILURE;
459 }
460 else if (BUFFER_enlarge(ctrlPacket, totalLen) != 0)
461 {
462 result = MU_FAILURE;
463 }
464 else
465 {
466 uint8_t* packet = BUFFER_u_char(ctrlPacket);
467 uint8_t* iterator = packet;
468
469 iterator += currLen;
470 byteutil_writeUTF(&iterator, mqttOptions->clientId, (uint16_t)clientLen);
471
472 // TODO: Read on the Will Topic
473 if (willMessageLen > USHRT_MAX || willTopicLen > USHRT_MAX || usernameLen > USHRT_MAX || passwordLen > USHRT_MAX)
474 {
475 result = MU_FAILURE;
476 }
477 else
478 {
479 STRING_HANDLE connect_payload_trace = NULL;
480 if (trace_log != NULL)
481 {
482 connect_payload_trace = STRING_new();
483 }
484 if (willMessageLen > 0 && willTopicLen > 0)
485 {
486 if (trace_log != NULL)
487 {
488 (void)STRING_sprintf(connect_payload_trace, " | WILL_TOPIC: %s", mqttOptions->willTopic);
489 }
490 packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG;
491 byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen);
492 packet[CONN_FLAG_BYTE_OFFSET] |= (mqttOptions->qualityOfServiceValue << 3);
493 if (mqttOptions->messageRetain)
494 {
495 packet[CONN_FLAG_BYTE_OFFSET] |= WILL_RETAIN_FLAG;
496 }
497 byteutil_writeUTF(&iterator, mqttOptions->willMessage, (uint16_t)willMessageLen);
498 }
499 if (usernameLen > 0)
500 {
501 packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
502 byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
503 if (trace_log != NULL)
504 {
505 (void)STRING_sprintf(connect_payload_trace, " | USERNAME: %s", mqttOptions->username);
506 }
507 }
508 if (passwordLen > 0)
509 {
510 packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
511 byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
512 if (trace_log != NULL)
513 {
514 (void)STRING_sprintf(connect_payload_trace, " | PWD: XXXX");
515 //(void)STRING_sprintf(connect_payload_trace, " | PWD: %s", mqttOptions->password);
516 }
517 }
518 // TODO: Get the rest of the flags
519 if (trace_log != NULL)
520 {
521 (void)STRING_sprintf(connect_payload_trace, " | CLEAN: %s", mqttOptions->useCleanSession ? "1" : "0");
522 }
523 if (mqttOptions->useCleanSession)
524 {
525 packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG;
526 }
527 if (trace_log != NULL)
528 {
529 (void)STRING_sprintf(trace_log, " %lu", packet[CONN_FLAG_BYTE_OFFSET]);
530 (void)STRING_concat_with_STRING(trace_log, connect_payload_trace);
531 STRING_delete(connect_payload_trace);
532 }
533 result = 0;
534 }
535 }
536 }
537 return result;
538}
539
540static int prepareheaderDataInfo(MQTTCODEC_INSTANCE* codecData, uint8_t remainLen)
541{
542 int result;
543 if (codecData == NULL)
544 {
545 result = MU_FAILURE;
546 }
547 else
548 {
549 result = 0;
550 codecData->storeRemainLen[codecData->remainLenIndex++] = remainLen;
551 if (remainLen <= 0x7f)
552 {
553 int multiplier = 1;
554 int totalLen = 0;
555 size_t index = 0;
556 uint8_t encodeByte = 0;
557 do
558 {
559 encodeByte = codecData->storeRemainLen[index++];
560 totalLen += (encodeByte & 127) * multiplier;
561 multiplier *= NEXT_128_CHUNK;
562
563 if (multiplier > MAX_3_DIGIT_PACKET_SIZE)
564 {
565 result = MU_FAILURE;
566 break;
567 }
568 } while ((encodeByte & NEXT_128_CHUNK) != 0);
569
570 if (result != 0 || totalLen > MAX_SEND_SIZE)
571 {
572 LogError("Receive buffer too large for MQTT packet");
573 result = MU_FAILURE;
574 }
575 else
576 {
577 codecData->codecState = CODEC_STATE_VAR_HEADER;
578
579 // Reset remainLen Index
580 codecData->remainLenIndex = 0;
581 memset(codecData->storeRemainLen, 0, 4 * sizeof(uint8_t));
582
583 if (totalLen > 0)
584 {
585 codecData->bufferOffset = 0;
586 codecData->headerData = BUFFER_new();
587 if (codecData->headerData == NULL)
588 {
589 /* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
590 LogError("Failed BUFFER_new");
591 result = MU_FAILURE;
592 }
593 else
594 {
595 if (BUFFER_pre_build(codecData->headerData, totalLen) != 0)
596 {
597 /* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
598 LogError("Failed BUFFER_pre_build");
599 result = MU_FAILURE;
600 }
601
602 }
603 }
604 }
605 }
606 }
607 return result;
608}
609
610static void completePacketData(MQTTCODEC_INSTANCE* codecData)
611{
612 if (codecData)
613 {
614 if (codecData->packetComplete != NULL)
615 {
616 codecData->packetComplete(codecData->callContext, codecData->currPacket, codecData->headerFlags, codecData->headerData);
617 }
618
619 // Clean up data
620 codecData->currPacket = UNKNOWN_TYPE;
621 codecData->codecState = CODEC_STATE_FIXED_HEADER;
622 codecData->headerFlags = 0;
623 BUFFER_delete(codecData->headerData);
624 codecData->headerData = NULL;
625 }
626}
627
628static void clear_codec_data(MQTTCODEC_INSTANCE* codec_data)
629{
630 // Clear the code instance data
631 codec_data->currPacket = UNKNOWN_TYPE;
632 codec_data->codecState = CODEC_STATE_FIXED_HEADER;
633 codec_data->headerFlags = 0;
634 codec_data->bufferOffset = 0;
635 codec_data->headerData = NULL;
636 memset(codec_data->storeRemainLen, 0, 4 * sizeof(uint8_t));
637 codec_data->remainLenIndex = 0;
638}
639
640void mqtt_codec_reset(MQTTCODEC_HANDLE handle)
641{
642 if (handle != NULL)
643 {
644 clear_codec_data(handle);
645 }
646}
647
648MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx)
649{
650 MQTTCODEC_HANDLE result;
651 result = malloc(sizeof(MQTTCODEC_INSTANCE));
652 /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */
653 if (result != NULL)
654 {
655 /* Codes_SRS_MQTT_CODEC_07_002: [On success mqtt_codec_create shall return a MQTTCODEC_HANDLE value.] */
656 clear_codec_data(result);
657 result->packetComplete = packetComplete;
658 result->callContext = callbackCtx;
659 }
660 return result;
661}
662
663void mqtt_codec_destroy(MQTTCODEC_HANDLE handle)
664{
665 /* Codes_SRS_MQTT_CODEC_07_003: [If the handle parameter is NULL then mqtt_codec_destroy shall do nothing.] */
666 if (handle != NULL)
667 {
668 MQTTCODEC_INSTANCE* codecData = (MQTTCODEC_INSTANCE*)handle;
669 /* Codes_SRS_MQTT_CODEC_07_004: [mqtt_codec_destroy shall deallocate all memory that has been allocated by this object.] */
670 BUFFER_delete(codecData->headerData);
671 free(codecData);
672 }
673}
674
675BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
676{
677 BUFFER_HANDLE result;
678 /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */
679 if (mqttOptions == NULL)
680 {
681 result = NULL;
682 }
683 else
684 {
685 /* Codes_SRS_MQTT_CODEC_07_009: [mqtt_codec_connect shall construct a BUFFER_HANDLE that represents a MQTT CONNECT packet.] */
686 result = BUFFER_new();
687 if (result != NULL)
688 {
689 STRING_HANDLE varible_header_log = NULL;
690 if (trace_log != NULL)
691 {
692 varible_header_log = STRING_new();
693 }
694 // Add Variable Header Information
695 if (constructConnectVariableHeader(result, mqttOptions, varible_header_log) != 0)
696 {
697 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
698 BUFFER_delete(result);
699 result = NULL;
700 }
701 else
702 {
703 if (constructConnPayload(result, mqttOptions, varible_header_log) != 0)
704 {
705 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
706 BUFFER_delete(result);
707 result = NULL;
708 }
709 else
710 {
711 if (trace_log != NULL)
712 {
713 (void)STRING_copy(trace_log, "CONNECT");
714 }
715 if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0)
716 {
717 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
718 BUFFER_delete(result);
719 result = NULL;
720 }
721 else
722 {
723 if (trace_log != NULL)
724 {
725 (void)STRING_concat_with_STRING(trace_log, varible_header_log);
726 }
727 }
728 }
729 if (varible_header_log != NULL)
730 {
731 STRING_delete(varible_header_log);
732 }
733 }
734 }
735 }
736 return result;
737}
738
739BUFFER_HANDLE mqtt_codec_disconnect()
740{
741 /* Codes_SRS_MQTT_CODEC_07_011: [On success mqtt_codec_disconnect shall construct a BUFFER_HANDLE that represents a MQTT DISCONNECT packet.] */
742 BUFFER_HANDLE result = BUFFER_new();
743 if (result != NULL)
744 {
745 if (BUFFER_enlarge(result, 2) != 0)
746 {
747 /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
748 BUFFER_delete(result);
749 result = NULL;
750 }
751 else
752 {
753 uint8_t* iterator = BUFFER_u_char(result);
754 if (iterator == NULL)
755 {
756 /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
757 BUFFER_delete(result);
758 result = NULL;
759 }
760 else
761 {
762 iterator[0] = DISCONNECT_TYPE;
763 iterator[1] = 0;
764 }
765 }
766 }
767 return result;
768}
769
770BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen, STRING_HANDLE trace_log)
771{
772 BUFFER_HANDLE result;
773 /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */
774 if (topicName == NULL)
775 {
776 result = NULL;
777 }
778 /* Codes_SRS_MQTT_CODEC_07_036: [mqtt_codec_publish shall return NULL if the buffLen variable is greater than the MAX_SEND_SIZE (0xFFFFFF7F).] */
779 else if (buffLen > MAX_SEND_SIZE)
780 {
781 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
782 result = NULL;
783 }
784 else
785 {
786 PUBLISH_HEADER_INFO publishInfo ={ 0 };
787 publishInfo.topicName = topicName;
788 publishInfo.packetId = packetId;
789 publishInfo.qualityOfServiceValue = qosValue;
790
791 uint8_t headerFlags = 0;
792 if (duplicateMsg) headerFlags |= PUBLISH_DUP_FLAG;
793 if (serverRetain) headerFlags |= PUBLISH_QOS_RETAIN;
794 if (qosValue != DELIVER_AT_MOST_ONCE)
795 {
796 if (qosValue == DELIVER_AT_LEAST_ONCE)
797 {
798 headerFlags |= PUBLISH_QOS_AT_LEAST_ONCE;
799 }
800 else
801 {
802 headerFlags |= PUBLISH_QOS_EXACTLY_ONCE;
803 }
804 }
805
806 /* Codes_SRS_MQTT_CODEC_07_007: [mqtt_codec_publish shall return a BUFFER_HANDLE that represents a MQTT PUBLISH message.] */
807 result = BUFFER_new();
808 if (result != NULL)
809 {
810 STRING_HANDLE varible_header_log = NULL;
811 if (trace_log != NULL)
812 {
813 varible_header_log = STRING_construct_sprintf(" | IS_DUP: %s | RETAIN: %d | QOS: %s", duplicateMsg ? TRUE_CONST : FALSE_CONST,
814 serverRetain ? 1 : 0,
815 retrieve_qos_value(publishInfo.qualityOfServiceValue) );
816 }
817
818 if (constructPublishVariableHeader(result, &publishInfo, varible_header_log) != 0)
819 {
820 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
821 BUFFER_delete(result);
822 result = NULL;
823 }
824 else
825 {
826 size_t payloadOffset = BUFFER_length(result);
827 if (buffLen > 0)
828 {
829 if (BUFFER_enlarge(result, buffLen) != 0)
830 {
831 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
832 BUFFER_delete(result);
833 result = NULL;
834 }
835 else
836 {
837 uint8_t* iterator = BUFFER_u_char(result);
838 if (iterator == NULL)
839 {
840 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
841 BUFFER_delete(result);
842 result = NULL;
843 }
844 else
845 {
846 iterator += payloadOffset;
847 // Write Message
848 (void)memcpy(iterator, msgBuffer, buffLen);
849 if (trace_log)
850 {
851 STRING_sprintf(varible_header_log, " | PAYLOAD_LEN: %lu", (unsigned long)buffLen);
852 }
853 }
854 }
855 }
856
857 if (result != NULL)
858 {
859 if (trace_log != NULL)
860 {
861 (void)STRING_copy(trace_log, "PUBLISH");
862 }
863 if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0)
864 {
865 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
866 BUFFER_delete(result);
867 result = NULL;
868 }
869 else
870 {
871 if (trace_log != NULL)
872 {
873 (void)STRING_concat_with_STRING(trace_log, varible_header_log);
874 }
875 }
876 }
877 }
878 if (varible_header_log != NULL)
879 {
880 STRING_delete(varible_header_log);
881 }
882 }
883 }
884 return result;
885}
886
887BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId)
888{
889 /* Codes_SRS_MQTT_CODEC_07_013: [On success mqtt_codec_publishAck shall return a BUFFER_HANDLE representation of a MQTT PUBACK packet.] */
890 /* Codes_SRS_MQTT_CODEC_07_014 : [If any error is encountered then mqtt_codec_publishAck shall return NULL.] */
891 BUFFER_HANDLE result = constructPublishReply(PUBACK_TYPE, 0, packetId);
892 return result;
893}
894
895BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId)
896{
897 /* Codes_SRS_MQTT_CODEC_07_015: [On success mqtt_codec_publishRecieved shall return a BUFFER_HANDLE representation of a MQTT PUBREC packet.] */
898 /* Codes_SRS_MQTT_CODEC_07_016 : [If any error is encountered then mqtt_codec_publishRecieved shall return NULL.] */
899 BUFFER_HANDLE result = constructPublishReply(PUBREC_TYPE, 0, packetId);
900 return result;
901}
902
903BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId)
904{
905 /* Codes_SRS_MQTT_CODEC_07_017: [On success mqtt_codec_publishRelease shall return a BUFFER_HANDLE representation of a MQTT PUBREL packet.] */
906 /* Codes_SRS_MQTT_CODEC_07_018 : [If any error is encountered then mqtt_codec_publishRelease shall return NULL.] */
907 BUFFER_HANDLE result = constructPublishReply(PUBREL_TYPE, 2, packetId);
908 return result;
909}
910
911BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId)
912{
913 /* Codes_SRS_MQTT_CODEC_07_019: [On success mqtt_codec_publishComplete shall return a BUFFER_HANDLE representation of a MQTT PUBCOMP packet.] */
914 /* Codes_SRS_MQTT_CODEC_07_020 : [If any error is encountered then mqtt_codec_publishComplete shall return NULL.] */
915 BUFFER_HANDLE result = constructPublishReply(PUBCOMP_TYPE, 0, packetId);
916 return result;
917}
918
919BUFFER_HANDLE mqtt_codec_ping()
920{
921 /* Codes_SRS_MQTT_CODEC_07_021: [On success mqtt_codec_ping shall construct a BUFFER_HANDLE that represents a MQTT PINGREQ packet.] */
922 BUFFER_HANDLE result = BUFFER_new();
923 if (result != NULL)
924 {
925 if (BUFFER_enlarge(result, 2) != 0)
926 {
927 /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
928 BUFFER_delete(result);
929 result = NULL;
930 }
931 else
932 {
933 uint8_t* iterator = BUFFER_u_char(result);
934 if (iterator == NULL)
935 {
936 /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
937 BUFFER_delete(result);
938 result = NULL;
939 }
940 else
941 {
942 iterator[0] = PINGREQ_TYPE;
943 iterator[1] = 0;
944 }
945 }
946 }
947 return result;
948}
949
950BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count, STRING_HANDLE trace_log)
951{
952 BUFFER_HANDLE result;
953 /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */
954 if (subscribeList == NULL || count == 0)
955 {
956 result = NULL;
957 }
958 else
959 {
960 /* Codes_SRS_MQTT_CODEC_07_026: [mqtt_codec_subscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.]*/
961 result = BUFFER_new();
962 if (result != NULL)
963 {
964 if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
965 {
966 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
967 BUFFER_delete(result);
968 result = NULL;
969 }
970 else
971 {
972 STRING_HANDLE sub_trace = NULL;
973 if (trace_log != NULL)
974 {
975 sub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
976 }
977 /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */
978 if (addListItemsToSubscribePacket(result, subscribeList, count, sub_trace) != 0)
979 {
980 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
981 BUFFER_delete(result);
982 result = NULL;
983 }
984 else
985 {
986
987 if (trace_log != NULL)
988 {
989 STRING_concat(trace_log, "SUBSCRIBE");
990 }
991 if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0)
992 {
993 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
994 BUFFER_delete(result);
995 result = NULL;
996 }
997 else
998 {
999 if (trace_log != NULL)
1000 {
1001 (void)STRING_concat_with_STRING(trace_log, sub_trace);
1002 }
1003 }
1004 }
1005 if (sub_trace != NULL)
1006 {
1007 STRING_delete(sub_trace);
1008 }
1009 }
1010 }
1011 }
1012 return result;
1013}
1014
1015BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count, STRING_HANDLE trace_log)
1016{
1017 BUFFER_HANDLE result;
1018 /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */
1019 if (unsubscribeList == NULL || count == 0)
1020 {
1021 result = NULL;
1022 }
1023 else
1024 {
1025 /* Codes_SRS_MQTT_CODEC_07_030: [mqtt_codec_unsubscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.] */
1026 result = BUFFER_new();
1027 if (result != NULL)
1028 {
1029 if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
1030 {
1031 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1032 BUFFER_delete(result);
1033 result = NULL;
1034 }
1035 else
1036 {
1037 STRING_HANDLE unsub_trace = NULL;
1038 if (trace_log != NULL)
1039 {
1040 unsub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
1041 }
1042 /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */
1043 if (addListItemsToUnsubscribePacket(result, unsubscribeList, count, unsub_trace) != 0)
1044 {
1045 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1046 BUFFER_delete(result);
1047 result = NULL;
1048 }
1049 else
1050 {
1051 if (trace_log != NULL)
1052 {
1053 (void)STRING_copy(trace_log, "UNSUBSCRIBE");
1054 }
1055 if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0)
1056 {
1057 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1058 BUFFER_delete(result);
1059 result = NULL;
1060 }
1061 else
1062 {
1063 if (trace_log != NULL)
1064 {
1065 (void)STRING_concat_with_STRING(trace_log, unsub_trace);
1066 }
1067 }
1068 }
1069 if (unsub_trace != NULL)
1070 {
1071 STRING_delete(unsub_trace);
1072 }
1073 }
1074 }
1075 }
1076 return result;
1077}
1078
1079int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size)
1080{
1081 int result;
1082 MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle;
1083 /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
1084 if (codec_Data == NULL)
1085 {
1086 result = MU_FAILURE;
1087 }
1088 /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
1089 /* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */
1090 else if (buffer == NULL || size == 0)
1091 {
1092 codec_Data->currPacket = PACKET_TYPE_ERROR;
1093 result = MU_FAILURE;
1094 }
1095 else
1096 {
1097 /* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */
1098 result = 0;
1099 size_t index = 0;
1100 for (index = 0; index < size && result == 0; index++)
1101 {
1102 uint8_t iterator = ((int8_t*)buffer)[index];
1103 if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER)
1104 {
1105 if (codec_Data->currPacket == UNKNOWN_TYPE)
1106 {
1107 codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags);
1108 }
1109 else
1110 {
1111 if (prepareheaderDataInfo(codec_Data, iterator) != 0)
1112 {
1113 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1114 codec_Data->currPacket = PACKET_TYPE_ERROR;
1115 result = MU_FAILURE;
1116 }
1117 else if (codec_Data->currPacket == PINGRESP_TYPE)
1118 {
1119 /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
1120 completePacketData(codec_Data);
1121 }
1122 }
1123 }
1124 else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
1125 {
1126 if (codec_Data->headerData == NULL)
1127 {
1128 codec_Data->codecState = CODEC_STATE_PAYLOAD;
1129 }
1130 else
1131 {
1132 uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData);
1133 if (dataBytes == NULL)
1134 {
1135 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1136 codec_Data->currPacket = PACKET_TYPE_ERROR;
1137 result = MU_FAILURE;
1138 }
1139 else
1140 {
1141 // Increment the data
1142 dataBytes += codec_Data->bufferOffset++;
1143 *dataBytes = iterator;
1144
1145 size_t totalLen = BUFFER_length(codec_Data->headerData);
1146 if (codec_Data->bufferOffset >= totalLen)
1147 {
1148 /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
1149 completePacketData(codec_Data);
1150 }
1151 }
1152 }
1153 }
1154 else
1155 {
1156 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1157 codec_Data->currPacket = PACKET_TYPE_ERROR;
1158 result = MU_FAILURE;
1159 }
1160 }
1161 }
1162 return result;
1163}
Note: See TracBrowser for help on using the repository browser.