source: azure_iot_hub_f767zi/trunk/azure_iot_sdk/umqtt/src/mqtt_codec.c@ 457

Last change on this file since 457 was 457, checked in by coas-nagasima, 4 years ago

ファイルを追加

  • Property svn:eol-style set to native
  • Property svn:mime-type set to text/x-csrc;charset=UTF-8
File size: 39.0 KB
Line 
1// Copyright (c) Microsoft. All rights reserved.
2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
3
4#include <stdlib.h>
5#include <limits.h>
6#include "azure_c_shared_utility/optimize_size.h"
7#include "azure_c_shared_utility/gballoc.h"
8#include "azure_c_shared_utility/buffer_.h"
9#include "azure_c_shared_utility/strings.h"
10#include "azure_macro_utils/macro_utils.h"
11#include "azure_c_shared_utility/xlogging.h"
12#include "azure_umqtt_c/mqtt_codec.h"
13#include <inttypes.h>
14
15#define PAYLOAD_OFFSET 5
16#define PACKET_TYPE_BYTE(p) (CONTROL_PACKET_TYPE)((uint8_t)(((uint8_t)(p)) & 0xf0))
17#define FLAG_VALUE_BYTE(p) ((uint8_t)(((uint8_t)(p)) & 0xf))
18
19#define USERNAME_FLAG 0x80
20#define PASSWORD_FLAG 0x40
21#define WILL_RETAIN_FLAG 0x20
22#define WILL_QOS_FLAG_ 0x18
23#define WILL_FLAG_FLAG 0x04
24#define CLEAN_SESSION_FLAG 0x02
25
26#define NEXT_128_CHUNK 0x80
27#define PUBLISH_DUP_FLAG 0x8
28#define PUBLISH_QOS_EXACTLY_ONCE 0x4
29#define PUBLISH_QOS_AT_LEAST_ONCE 0x2
30#define PUBLISH_QOS_RETAIN 0x1
31
32#define PROTOCOL_NUMBER 4
33#define CONN_FLAG_BYTE_OFFSET 7
34
35#define CONNECT_FIXED_HEADER_SIZE 2
36#define CONNECT_VARIABLE_HEADER_SIZE 10
37#define SUBSCRIBE_FIXED_HEADER_FLAG 0x2
38#define UNSUBSCRIBE_FIXED_HEADER_FLAG 0x2
39
40#define MAX_SEND_SIZE 0xFFFFFF7F // 268435455
41
42// This captures the maximum packet size for 3 digits.
43// If it's above this value then we bail out of the loop
44#define MAX_3_DIGIT_PACKET_SIZE 2097152
45
46#define CODEC_STATE_VALUES \
47 CODEC_STATE_FIXED_HEADER, \
48 CODEC_STATE_VAR_HEADER, \
49 CODEC_STATE_PAYLOAD
50
51static const char* const TRUE_CONST = "true";
52static const char* const FALSE_CONST = "false";
53
54MU_DEFINE_ENUM(CODEC_STATE_RESULT, CODEC_STATE_VALUES);
55
56typedef struct MQTTCODEC_INSTANCE_TAG
57{
58 CONTROL_PACKET_TYPE currPacket;
59 CODEC_STATE_RESULT codecState;
60 size_t bufferOffset;
61 int headerFlags;
62 BUFFER_HANDLE headerData;
63 ON_PACKET_COMPLETE_CALLBACK packetComplete;
64 void* callContext;
65 uint8_t storeRemainLen[4];
66 size_t remainLenIndex;
67} MQTTCODEC_INSTANCE;
68
69typedef struct PUBLISH_HEADER_INFO_TAG
70{
71 const char* topicName;
72 uint16_t packetId;
73 const char* msgBuffer;
74 QOS_VALUE qualityOfServiceValue;
75} PUBLISH_HEADER_INFO;
76
77static const char* retrieve_qos_value(QOS_VALUE value)
78{
79 switch (value)
80 {
81 case DELIVER_AT_MOST_ONCE:
82 return "DELIVER_AT_MOST_ONCE";
83 case DELIVER_AT_LEAST_ONCE:
84 return "DELIVER_AT_LEAST_ONCE";
85 case DELIVER_EXACTLY_ONCE:
86 default:
87 return "DELIVER_EXACTLY_ONCE";
88 }
89}
90
91static void byteutil_writeByte(uint8_t** buffer, uint8_t value)
92{
93 if (buffer != NULL)
94 {
95 **buffer = value;
96 (*buffer)++;
97 }
98}
99
100static void byteutil_writeInt(uint8_t** buffer, uint16_t value)
101{
102 if (buffer != NULL)
103 {
104 **buffer = (char)(value / 256);
105 (*buffer)++;
106 **buffer = (char)(value % 256);
107 (*buffer)++;
108 }
109}
110
111static void byteutil_writeUTF(uint8_t** buffer, const char* stringData, uint16_t len)
112{
113 if (buffer != NULL)
114 {
115 byteutil_writeInt(buffer, len);
116 (void)memcpy(*buffer, stringData, len);
117 *buffer += len;
118 }
119}
120
121static CONTROL_PACKET_TYPE processControlPacketType(uint8_t pktByte, int* flags)
122{
123 CONTROL_PACKET_TYPE result;
124 result = PACKET_TYPE_BYTE(pktByte);
125 if (flags != NULL)
126 {
127 *flags = FLAG_VALUE_BYTE(pktByte);
128 }
129 return result;
130}
131
132static int addListItemsToUnsubscribePacket(BUFFER_HANDLE ctrlPacket, const char** payloadList, size_t payloadCount, STRING_HANDLE trace_log)
133{
134 int result = 0;
135 size_t index = 0;
136 for (index = 0; index < payloadCount && result == 0; index++)
137 {
138 // Add the Payload
139 size_t offsetLen = BUFFER_length(ctrlPacket);
140 size_t topicLen = strlen(payloadList[index]);
141 if (topicLen > USHRT_MAX)
142 {
143 result = MU_FAILURE;
144 }
145 else if (BUFFER_enlarge(ctrlPacket, topicLen + 2) != 0)
146 {
147 result = MU_FAILURE;
148 }
149 else
150 {
151 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
152 iterator += offsetLen;
153 byteutil_writeUTF(&iterator, payloadList[index], (uint16_t)topicLen);
154 }
155 if (trace_log != NULL)
156 {
157 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", payloadList[index]);
158 }
159 }
160 return result;
161}
162
163static int addListItemsToSubscribePacket(BUFFER_HANDLE ctrlPacket, SUBSCRIBE_PAYLOAD* payloadList, size_t payloadCount, STRING_HANDLE trace_log)
164{
165 int result = 0;
166 size_t index = 0;
167 for (index = 0; index < payloadCount && result == 0; index++)
168 {
169 // Add the Payload
170 size_t offsetLen = BUFFER_length(ctrlPacket);
171 size_t topicLen = strlen(payloadList[index].subscribeTopic);
172 if (topicLen > USHRT_MAX)
173 {
174 result = MU_FAILURE;
175 }
176 else if (BUFFER_enlarge(ctrlPacket, topicLen + 2 + 1) != 0)
177 {
178 result = MU_FAILURE;
179 }
180 else
181 {
182 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
183 iterator += offsetLen;
184 byteutil_writeUTF(&iterator, payloadList[index].subscribeTopic, (uint16_t)topicLen);
185 *iterator = payloadList[index].qosReturn;
186
187 if (trace_log != NULL)
188 {
189 STRING_sprintf(trace_log, " | TOPIC_NAME: %s | QOS: %d", payloadList[index].subscribeTopic, (int)payloadList[index].qosReturn);
190 }
191 }
192 }
193 return result;
194}
195
196static int constructConnectVariableHeader(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
197{
198 int result = 0;
199 if (BUFFER_enlarge(ctrlPacket, CONNECT_VARIABLE_HEADER_SIZE) != 0)
200 {
201 result = MU_FAILURE;
202 }
203 else
204 {
205 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
206 if (iterator == NULL)
207 {
208 result = MU_FAILURE;
209 }
210 else
211 {
212 if (trace_log != NULL)
213 {
214 STRING_sprintf(trace_log, " | VER: %d | KEEPALIVE: %d | FLAGS:", PROTOCOL_NUMBER, mqttOptions->keepAliveInterval);
215 }
216 byteutil_writeUTF(&iterator, "MQTT", 4);
217 byteutil_writeByte(&iterator, PROTOCOL_NUMBER);
218 byteutil_writeByte(&iterator, 0); // Flags will be entered later
219 byteutil_writeInt(&iterator, mqttOptions->keepAliveInterval);
220 result = 0;
221 }
222 }
223 return result;
224}
225
226static int constructPublishVariableHeader(BUFFER_HANDLE ctrlPacket, const PUBLISH_HEADER_INFO* publishHeader, STRING_HANDLE trace_log)
227{
228 int result = 0;
229 size_t topicLen = 0;
230 size_t spaceLen = 0;
231 size_t idLen = 0;
232
233 size_t currLen = BUFFER_length(ctrlPacket);
234
235 topicLen = strlen(publishHeader->topicName);
236 spaceLen += 2;
237
238 if (publishHeader->qualityOfServiceValue != DELIVER_AT_MOST_ONCE)
239 {
240 // Packet Id is only set if the QOS is not 0
241 idLen = 2;
242 }
243
244 if (topicLen > USHRT_MAX)
245 {
246 result = MU_FAILURE;
247 }
248 else if (BUFFER_enlarge(ctrlPacket, topicLen + idLen + spaceLen) != 0)
249 {
250 result = MU_FAILURE;
251 }
252 else
253 {
254 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
255 if (iterator == NULL)
256 {
257 result = MU_FAILURE;
258 }
259 else
260 {
261 iterator += currLen;
262 /* The Topic Name MUST be present as the first field in the PUBLISH Packet Variable header.It MUST be 792 a UTF-8 encoded string [MQTT-3.3.2-1] as defined in section 1.5.3.*/
263 byteutil_writeUTF(&iterator, publishHeader->topicName, (uint16_t)topicLen);
264 if (trace_log != NULL)
265 {
266 STRING_sprintf(trace_log, " | TOPIC_NAME: %s", publishHeader->topicName);
267 }
268 if (idLen > 0)
269 {
270 if (trace_log != NULL)
271 {
272 STRING_sprintf(trace_log, " | PACKET_ID: %"PRIu16, publishHeader->packetId);
273 }
274 byteutil_writeInt(&iterator, publishHeader->packetId);
275 }
276 result = 0;
277 }
278 }
279 return result;
280}
281
282static int constructSubscibeTypeVariableHeader(BUFFER_HANDLE ctrlPacket, uint16_t packetId)
283{
284 int result = 0;
285 if (BUFFER_enlarge(ctrlPacket, 2) != 0)
286 {
287 result = MU_FAILURE;
288 }
289 else
290 {
291 uint8_t* iterator = BUFFER_u_char(ctrlPacket);
292 if (iterator == NULL)
293 {
294 result = MU_FAILURE;
295 }
296 else
297 {
298 byteutil_writeInt(&iterator, packetId);
299 result = 0;
300 }
301 }
302 return result;
303}
304
305static BUFFER_HANDLE constructPublishReply(CONTROL_PACKET_TYPE type, uint8_t flags, uint16_t packetId)
306{
307 BUFFER_HANDLE result = BUFFER_new();
308 if (result != NULL)
309 {
310 if (BUFFER_pre_build(result, 4) != 0)
311 {
312 BUFFER_delete(result);
313 result = NULL;
314 }
315 else
316 {
317 uint8_t* iterator = BUFFER_u_char(result);
318 if (iterator == NULL)
319 {
320 BUFFER_delete(result);
321 result = NULL;
322 }
323 else
324 {
325 *iterator = (uint8_t)type | flags;
326 iterator++;
327 *iterator = 0x2;
328 iterator++;
329 byteutil_writeInt(&iterator, packetId);
330 }
331 }
332 }
333 return result;
334}
335
336static int constructFixedHeader(BUFFER_HANDLE ctrlPacket, CONTROL_PACKET_TYPE packetType, uint8_t flags)
337{
338 int result;
339 size_t packetLen = BUFFER_length(ctrlPacket);
340 uint8_t remainSize[4] ={ 0 };
341 size_t index = 0;
342
343 // Calculate the length of packet
344 do
345 {
346 uint8_t encode = packetLen % 128;
347 packetLen /= 128;
348 // if there are more data to encode, set the top bit of this byte
349 if (packetLen > 0)
350 {
351 encode |= NEXT_128_CHUNK;
352 }
353 remainSize[index++] = encode;
354 } while (packetLen > 0);
355
356 BUFFER_HANDLE fixedHeader = BUFFER_new();
357 if (fixedHeader == NULL)
358 {
359 result = MU_FAILURE;
360 }
361 else if (BUFFER_pre_build(fixedHeader, index + 1) != 0)
362 {
363 BUFFER_delete(fixedHeader);
364 result = MU_FAILURE;
365 }
366 else
367 {
368 uint8_t* iterator = BUFFER_u_char(fixedHeader);
369 *iterator = (uint8_t)packetType | flags;
370 iterator++;
371 (void)memcpy(iterator, remainSize, index);
372
373 result = BUFFER_prepend(ctrlPacket, fixedHeader);
374 BUFFER_delete(fixedHeader);
375 }
376 return result;
377}
378
379static int constructConnPayload(BUFFER_HANDLE ctrlPacket, const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
380{
381 int result = 0;
382 size_t clientLen = 0;
383 size_t usernameLen = 0;
384 size_t passwordLen = 0;
385 size_t willMessageLen = 0;
386 size_t willTopicLen = 0;
387 size_t spaceLen = 0;
388 size_t currLen = 0;
389 size_t totalLen = 0;
390
391 if (mqttOptions->clientId != NULL)
392 {
393 spaceLen += 2;
394 clientLen = strlen(mqttOptions->clientId);
395 }
396 if (mqttOptions->username != NULL)
397 {
398 spaceLen += 2;
399 usernameLen = strlen(mqttOptions->username);
400 }
401 if (mqttOptions->password != NULL)
402 {
403 spaceLen += 2;
404 passwordLen = strlen(mqttOptions->password);
405 }
406 if (mqttOptions->willMessage != NULL)
407 {
408 spaceLen += 2;
409 willMessageLen = strlen(mqttOptions->willMessage);
410 }
411 if (mqttOptions->willTopic != NULL)
412 {
413 spaceLen += 2;
414 willTopicLen = strlen(mqttOptions->willTopic);
415 }
416
417 currLen = BUFFER_length(ctrlPacket);
418 totalLen = clientLen + usernameLen + passwordLen + willMessageLen + willTopicLen + spaceLen;
419
420 // Validate the Username & Password
421 if (clientLen > USHRT_MAX)
422 {
423 result = MU_FAILURE;
424 }
425 else if (usernameLen == 0 && passwordLen > 0)
426 {
427 result = MU_FAILURE;
428 }
429 else if ((willMessageLen > 0 && willTopicLen == 0) || (willTopicLen > 0 && willMessageLen == 0))
430 {
431 result = MU_FAILURE;
432 }
433 else if (BUFFER_enlarge(ctrlPacket, totalLen) != 0)
434 {
435 result = MU_FAILURE;
436 }
437 else
438 {
439 uint8_t* packet = BUFFER_u_char(ctrlPacket);
440 uint8_t* iterator = packet;
441
442 iterator += currLen;
443 byteutil_writeUTF(&iterator, mqttOptions->clientId, (uint16_t)clientLen);
444
445 // TODO: Read on the Will Topic
446 if (willMessageLen > USHRT_MAX || willTopicLen > USHRT_MAX || usernameLen > USHRT_MAX || passwordLen > USHRT_MAX)
447 {
448 result = MU_FAILURE;
449 }
450 else
451 {
452 STRING_HANDLE connect_payload_trace = NULL;
453 if (trace_log != NULL)
454 {
455 connect_payload_trace = STRING_new();
456 }
457 if (willMessageLen > 0 && willTopicLen > 0)
458 {
459 if (trace_log != NULL)
460 {
461 (void)STRING_sprintf(connect_payload_trace, " | WILL_TOPIC: %s", mqttOptions->willTopic);
462 }
463 packet[CONN_FLAG_BYTE_OFFSET] |= WILL_FLAG_FLAG;
464 byteutil_writeUTF(&iterator, mqttOptions->willTopic, (uint16_t)willTopicLen);
465 packet[CONN_FLAG_BYTE_OFFSET] |= (mqttOptions->qualityOfServiceValue << 3);
466 if (mqttOptions->messageRetain)
467 {
468 packet[CONN_FLAG_BYTE_OFFSET] |= WILL_RETAIN_FLAG;
469 }
470 byteutil_writeUTF(&iterator, mqttOptions->willMessage, (uint16_t)willMessageLen);
471 }
472 if (usernameLen > 0)
473 {
474 packet[CONN_FLAG_BYTE_OFFSET] |= USERNAME_FLAG;
475 byteutil_writeUTF(&iterator, mqttOptions->username, (uint16_t)usernameLen);
476 if (trace_log != NULL)
477 {
478 (void)STRING_sprintf(connect_payload_trace, " | USERNAME: %s", mqttOptions->username);
479 }
480 }
481 if (passwordLen > 0)
482 {
483 packet[CONN_FLAG_BYTE_OFFSET] |= PASSWORD_FLAG;
484 byteutil_writeUTF(&iterator, mqttOptions->password, (uint16_t)passwordLen);
485 if (trace_log != NULL)
486 {
487 (void)STRING_sprintf(connect_payload_trace, " | PWD: XXXX");
488 }
489 }
490 // TODO: Get the rest of the flags
491 if (trace_log != NULL)
492 {
493 (void)STRING_sprintf(connect_payload_trace, " | CLEAN: %s", mqttOptions->useCleanSession ? "1" : "0");
494 }
495 if (mqttOptions->useCleanSession)
496 {
497 packet[CONN_FLAG_BYTE_OFFSET] |= CLEAN_SESSION_FLAG;
498 }
499 if (trace_log != NULL)
500 {
501 (void)STRING_sprintf(trace_log, " %lu", packet[CONN_FLAG_BYTE_OFFSET]);
502 (void)STRING_concat_with_STRING(trace_log, connect_payload_trace);
503 STRING_delete(connect_payload_trace);
504 }
505 result = 0;
506 }
507 }
508 return result;
509}
510
511static int prepareheaderDataInfo(MQTTCODEC_INSTANCE* codecData, uint8_t remainLen)
512{
513 int result;
514 result = 0;
515 codecData->storeRemainLen[codecData->remainLenIndex++] = remainLen;
516 if (remainLen <= 0x7f)
517 {
518 int multiplier = 1;
519 int totalLen = 0;
520 size_t index = 0;
521 uint8_t encodeByte = 0;
522 do
523 {
524 encodeByte = codecData->storeRemainLen[index++];
525 totalLen += (encodeByte & 127) * multiplier;
526 multiplier *= NEXT_128_CHUNK;
527
528 if (multiplier > MAX_3_DIGIT_PACKET_SIZE)
529 {
530 result = MU_FAILURE;
531 break;
532 }
533 } while ((encodeByte & NEXT_128_CHUNK) != 0);
534
535 if (result != 0 || totalLen > MAX_SEND_SIZE)
536 {
537 LogError("Receive buffer too large for MQTT packet");
538 result = MU_FAILURE;
539 }
540 else
541 {
542 codecData->codecState = CODEC_STATE_VAR_HEADER;
543
544 // Reset remainLen Index
545 codecData->remainLenIndex = 0;
546 memset(codecData->storeRemainLen, 0, 4 * sizeof(uint8_t));
547
548 if (totalLen > 0)
549 {
550 codecData->bufferOffset = 0;
551 codecData->headerData = BUFFER_new();
552 if (codecData->headerData == NULL)
553 {
554 /* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
555 LogError("Failed BUFFER_new");
556 result = MU_FAILURE;
557 }
558 else
559 {
560 if (BUFFER_pre_build(codecData->headerData, totalLen) != 0)
561 {
562 /* Codes_SRS_MQTT_CODEC_07_035: [ If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value. ] */
563 LogError("Failed BUFFER_pre_build");
564 result = MU_FAILURE;
565 }
566
567 }
568 }
569 }
570 }
571 return result;
572}
573
574static void completePacketData(MQTTCODEC_INSTANCE* codecData)
575{
576 if (codecData->packetComplete != NULL)
577 {
578 codecData->packetComplete(codecData->callContext, codecData->currPacket, codecData->headerFlags, codecData->headerData);
579 }
580
581 // Clean up data
582 codecData->currPacket = UNKNOWN_TYPE;
583 codecData->codecState = CODEC_STATE_FIXED_HEADER;
584 codecData->headerFlags = 0;
585 BUFFER_delete(codecData->headerData);
586 codecData->headerData = NULL;
587}
588
589static void clear_codec_data(MQTTCODEC_INSTANCE* codec_data)
590{
591 // Clear the code instance data
592 codec_data->currPacket = UNKNOWN_TYPE;
593 codec_data->codecState = CODEC_STATE_FIXED_HEADER;
594 codec_data->headerFlags = 0;
595 codec_data->bufferOffset = 0;
596 codec_data->headerData = NULL;
597 memset(codec_data->storeRemainLen, 0, 4 * sizeof(uint8_t));
598 codec_data->remainLenIndex = 0;
599}
600
601void mqtt_codec_reset(MQTTCODEC_HANDLE handle)
602{
603 if (handle != NULL)
604 {
605 clear_codec_data(handle);
606 }
607}
608
609MQTTCODEC_HANDLE mqtt_codec_create(ON_PACKET_COMPLETE_CALLBACK packetComplete, void* callbackCtx)
610{
611 MQTTCODEC_HANDLE result;
612 result = malloc(sizeof(MQTTCODEC_INSTANCE));
613 /* Codes_SRS_MQTT_CODEC_07_001: [If a failure is encountered then mqtt_codec_create shall return NULL.] */
614 if (result != NULL)
615 {
616 /* Codes_SRS_MQTT_CODEC_07_002: [On success mqtt_codec_create shall return a MQTTCODEC_HANDLE value.] */
617 clear_codec_data(result);
618 result->packetComplete = packetComplete;
619 result->callContext = callbackCtx;
620 }
621 return result;
622}
623
624void mqtt_codec_destroy(MQTTCODEC_HANDLE handle)
625{
626 /* Codes_SRS_MQTT_CODEC_07_003: [If the handle parameter is NULL then mqtt_codec_destroy shall do nothing.] */
627 if (handle != NULL)
628 {
629 MQTTCODEC_INSTANCE* codecData = (MQTTCODEC_INSTANCE*)handle;
630 /* Codes_SRS_MQTT_CODEC_07_004: [mqtt_codec_destroy shall deallocate all memory that has been allocated by this object.] */
631 BUFFER_delete(codecData->headerData);
632 free(codecData);
633 }
634}
635
636BUFFER_HANDLE mqtt_codec_connect(const MQTT_CLIENT_OPTIONS* mqttOptions, STRING_HANDLE trace_log)
637{
638 BUFFER_HANDLE result;
639 /* Codes_SRS_MQTT_CODEC_07_008: [If the parameters mqttOptions is NULL then mqtt_codec_connect shall return a null value.] */
640 if (mqttOptions == NULL)
641 {
642 result = NULL;
643 }
644 else
645 {
646 /* Codes_SRS_MQTT_CODEC_07_009: [mqtt_codec_connect shall construct a BUFFER_HANDLE that represents a MQTT CONNECT packet.] */
647 result = BUFFER_new();
648 if (result != NULL)
649 {
650 STRING_HANDLE varible_header_log = NULL;
651 if (trace_log != NULL)
652 {
653 varible_header_log = STRING_new();
654 }
655 // Add Variable Header Information
656 if (constructConnectVariableHeader(result, mqttOptions, varible_header_log) != 0)
657 {
658 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
659 BUFFER_delete(result);
660 result = NULL;
661 }
662 else
663 {
664 if (constructConnPayload(result, mqttOptions, varible_header_log) != 0)
665 {
666 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
667 BUFFER_delete(result);
668 result = NULL;
669 }
670 else
671 {
672 if (trace_log != NULL)
673 {
674 (void)STRING_copy(trace_log, "CONNECT");
675 }
676 if (constructFixedHeader(result, CONNECT_TYPE, 0) != 0)
677 {
678 /* Codes_SRS_MQTT_CODEC_07_010: [If any error is encountered then mqtt_codec_connect shall return NULL.] */
679 BUFFER_delete(result);
680 result = NULL;
681 }
682 else
683 {
684 if (trace_log != NULL)
685 {
686 (void)STRING_concat_with_STRING(trace_log, varible_header_log);
687 }
688 }
689 }
690 if (varible_header_log != NULL)
691 {
692 STRING_delete(varible_header_log);
693 }
694 }
695 }
696 }
697 return result;
698}
699
700BUFFER_HANDLE mqtt_codec_disconnect()
701{
702 /* Codes_SRS_MQTT_CODEC_07_011: [On success mqtt_codec_disconnect shall construct a BUFFER_HANDLE that represents a MQTT DISCONNECT packet.] */
703 BUFFER_HANDLE result = BUFFER_new();
704 if (result != NULL)
705 {
706 if (BUFFER_enlarge(result, 2) != 0)
707 {
708 /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
709 BUFFER_delete(result);
710 result = NULL;
711 }
712 else
713 {
714 uint8_t* iterator = BUFFER_u_char(result);
715 if (iterator == NULL)
716 {
717 /* Codes_SRS_MQTT_CODEC_07_012: [If any error is encountered mqtt_codec_disconnect shall return NULL.] */
718 BUFFER_delete(result);
719 result = NULL;
720 }
721 else
722 {
723 iterator[0] = DISCONNECT_TYPE;
724 iterator[1] = 0;
725 }
726 }
727 }
728 return result;
729}
730
731BUFFER_HANDLE mqtt_codec_publish(QOS_VALUE qosValue, bool duplicateMsg, bool serverRetain, uint16_t packetId, const char* topicName, const uint8_t* msgBuffer, size_t buffLen, STRING_HANDLE trace_log)
732{
733 BUFFER_HANDLE result;
734 /* Codes_SRS_MQTT_CODEC_07_005: [If the parameters topicName is NULL then mqtt_codec_publish shall return NULL.] */
735 if (topicName == NULL)
736 {
737 result = NULL;
738 }
739 /* Codes_SRS_MQTT_CODEC_07_036: [mqtt_codec_publish shall return NULL if the buffLen variable is greater than the MAX_SEND_SIZE (0xFFFFFF7F).] */
740 else if (buffLen > MAX_SEND_SIZE)
741 {
742 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
743 result = NULL;
744 }
745 else
746 {
747 PUBLISH_HEADER_INFO publishInfo ={ 0 };
748 publishInfo.topicName = topicName;
749 publishInfo.packetId = packetId;
750 publishInfo.qualityOfServiceValue = qosValue;
751
752 uint8_t headerFlags = 0;
753 if (duplicateMsg) headerFlags |= PUBLISH_DUP_FLAG;
754 if (serverRetain) headerFlags |= PUBLISH_QOS_RETAIN;
755 if (qosValue != DELIVER_AT_MOST_ONCE)
756 {
757 if (qosValue == DELIVER_AT_LEAST_ONCE)
758 {
759 headerFlags |= PUBLISH_QOS_AT_LEAST_ONCE;
760 }
761 else
762 {
763 headerFlags |= PUBLISH_QOS_EXACTLY_ONCE;
764 }
765 }
766
767 /* Codes_SRS_MQTT_CODEC_07_007: [mqtt_codec_publish shall return a BUFFER_HANDLE that represents a MQTT PUBLISH message.] */
768 result = BUFFER_new();
769 if (result != NULL)
770 {
771 STRING_HANDLE varible_header_log = NULL;
772 if (trace_log != NULL)
773 {
774 varible_header_log = STRING_construct_sprintf(" | IS_DUP: %s | RETAIN: %d | QOS: %s", duplicateMsg ? TRUE_CONST : FALSE_CONST,
775 serverRetain ? 1 : 0,
776 retrieve_qos_value(publishInfo.qualityOfServiceValue) );
777 }
778
779 if (constructPublishVariableHeader(result, &publishInfo, varible_header_log) != 0)
780 {
781 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
782 BUFFER_delete(result);
783 result = NULL;
784 }
785 else
786 {
787 size_t payloadOffset = BUFFER_length(result);
788 if (buffLen > 0)
789 {
790 if (BUFFER_enlarge(result, buffLen) != 0)
791 {
792 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
793 BUFFER_delete(result);
794 result = NULL;
795 }
796 else
797 {
798 uint8_t* iterator = BUFFER_u_char(result);
799 if (iterator == NULL)
800 {
801 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
802 BUFFER_delete(result);
803 result = NULL;
804 }
805 else
806 {
807 iterator += payloadOffset;
808 // Write Message
809 (void)memcpy(iterator, msgBuffer, buffLen);
810 if (trace_log)
811 {
812 STRING_sprintf(varible_header_log, " | PAYLOAD_LEN: %lu", (unsigned long)buffLen);
813 }
814 }
815 }
816 }
817
818 if (result != NULL)
819 {
820 if (trace_log != NULL)
821 {
822 (void)STRING_copy(trace_log, "PUBLISH");
823 }
824 if (constructFixedHeader(result, PUBLISH_TYPE, headerFlags) != 0)
825 {
826 /* Codes_SRS_MQTT_CODEC_07_006: [If any error is encountered then mqtt_codec_publish shall return NULL.] */
827 BUFFER_delete(result);
828 result = NULL;
829 }
830 else
831 {
832 if (trace_log != NULL)
833 {
834 (void)STRING_concat_with_STRING(trace_log, varible_header_log);
835 }
836 }
837 }
838 }
839 if (varible_header_log != NULL)
840 {
841 STRING_delete(varible_header_log);
842 }
843 }
844 }
845 return result;
846}
847
848BUFFER_HANDLE mqtt_codec_publishAck(uint16_t packetId)
849{
850 /* Codes_SRS_MQTT_CODEC_07_013: [On success mqtt_codec_publishAck shall return a BUFFER_HANDLE representation of a MQTT PUBACK packet.] */
851 /* Codes_SRS_MQTT_CODEC_07_014 : [If any error is encountered then mqtt_codec_publishAck shall return NULL.] */
852 BUFFER_HANDLE result = constructPublishReply(PUBACK_TYPE, 0, packetId);
853 return result;
854}
855
856BUFFER_HANDLE mqtt_codec_publishReceived(uint16_t packetId)
857{
858 /* Codes_SRS_MQTT_CODEC_07_015: [On success mqtt_codec_publishRecieved shall return a BUFFER_HANDLE representation of a MQTT PUBREC packet.] */
859 /* Codes_SRS_MQTT_CODEC_07_016 : [If any error is encountered then mqtt_codec_publishRecieved shall return NULL.] */
860 BUFFER_HANDLE result = constructPublishReply(PUBREC_TYPE, 0, packetId);
861 return result;
862}
863
864BUFFER_HANDLE mqtt_codec_publishRelease(uint16_t packetId)
865{
866 /* Codes_SRS_MQTT_CODEC_07_017: [On success mqtt_codec_publishRelease shall return a BUFFER_HANDLE representation of a MQTT PUBREL packet.] */
867 /* Codes_SRS_MQTT_CODEC_07_018 : [If any error is encountered then mqtt_codec_publishRelease shall return NULL.] */
868 BUFFER_HANDLE result = constructPublishReply(PUBREL_TYPE, 2, packetId);
869 return result;
870}
871
872BUFFER_HANDLE mqtt_codec_publishComplete(uint16_t packetId)
873{
874 /* Codes_SRS_MQTT_CODEC_07_019: [On success mqtt_codec_publishComplete shall return a BUFFER_HANDLE representation of a MQTT PUBCOMP packet.] */
875 /* Codes_SRS_MQTT_CODEC_07_020 : [If any error is encountered then mqtt_codec_publishComplete shall return NULL.] */
876 BUFFER_HANDLE result = constructPublishReply(PUBCOMP_TYPE, 0, packetId);
877 return result;
878}
879
880BUFFER_HANDLE mqtt_codec_ping()
881{
882 /* Codes_SRS_MQTT_CODEC_07_021: [On success mqtt_codec_ping shall construct a BUFFER_HANDLE that represents a MQTT PINGREQ packet.] */
883 BUFFER_HANDLE result = BUFFER_new();
884 if (result != NULL)
885 {
886 if (BUFFER_enlarge(result, 2) != 0)
887 {
888 /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
889 BUFFER_delete(result);
890 result = NULL;
891 }
892 else
893 {
894 uint8_t* iterator = BUFFER_u_char(result);
895 if (iterator == NULL)
896 {
897 /* Codes_SRS_MQTT_CODEC_07_022: [If any error is encountered mqtt_codec_ping shall return NULL.] */
898 BUFFER_delete(result);
899 result = NULL;
900 }
901 else
902 {
903 iterator[0] = PINGREQ_TYPE;
904 iterator[1] = 0;
905 }
906 }
907 }
908 return result;
909}
910
911BUFFER_HANDLE mqtt_codec_subscribe(uint16_t packetId, SUBSCRIBE_PAYLOAD* subscribeList, size_t count, STRING_HANDLE trace_log)
912{
913 BUFFER_HANDLE result;
914 /* Codes_SRS_MQTT_CODEC_07_023: [If the parameters subscribeList is NULL or if count is 0 then mqtt_codec_subscribe shall return NULL.] */
915 if (subscribeList == NULL || count == 0)
916 {
917 result = NULL;
918 }
919 else
920 {
921 /* Codes_SRS_MQTT_CODEC_07_026: [mqtt_codec_subscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.]*/
922 result = BUFFER_new();
923 if (result != NULL)
924 {
925 if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
926 {
927 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
928 BUFFER_delete(result);
929 result = NULL;
930 }
931 else
932 {
933 STRING_HANDLE sub_trace = NULL;
934 if (trace_log != NULL)
935 {
936 sub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
937 }
938 /* Codes_SRS_MQTT_CODEC_07_024: [mqtt_codec_subscribe shall iterate through count items in the subscribeList.] */
939 if (addListItemsToSubscribePacket(result, subscribeList, count, sub_trace) != 0)
940 {
941 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
942 BUFFER_delete(result);
943 result = NULL;
944 }
945 else
946 {
947
948 if (trace_log != NULL)
949 {
950 STRING_concat(trace_log, "SUBSCRIBE");
951 }
952 if (constructFixedHeader(result, SUBSCRIBE_TYPE, SUBSCRIBE_FIXED_HEADER_FLAG) != 0)
953 {
954 /* Codes_SRS_MQTT_CODEC_07_025: [If any error is encountered then mqtt_codec_subscribe shall return NULL.] */
955 BUFFER_delete(result);
956 result = NULL;
957 }
958 else
959 {
960 if (trace_log != NULL)
961 {
962 (void)STRING_concat_with_STRING(trace_log, sub_trace);
963 }
964 }
965 }
966 if (sub_trace != NULL)
967 {
968 STRING_delete(sub_trace);
969 }
970 }
971 }
972 }
973 return result;
974}
975
976BUFFER_HANDLE mqtt_codec_unsubscribe(uint16_t packetId, const char** unsubscribeList, size_t count, STRING_HANDLE trace_log)
977{
978 BUFFER_HANDLE result;
979 /* Codes_SRS_MQTT_CODEC_07_027: [If the parameters unsubscribeList is NULL or if count is 0 then mqtt_codec_unsubscribe shall return NULL.] */
980 if (unsubscribeList == NULL || count == 0)
981 {
982 result = NULL;
983 }
984 else
985 {
986 /* Codes_SRS_MQTT_CODEC_07_030: [mqtt_codec_unsubscribe shall return a BUFFER_HANDLE that represents a MQTT SUBSCRIBE message.] */
987 result = BUFFER_new();
988 if (result != NULL)
989 {
990 if (constructSubscibeTypeVariableHeader(result, packetId) != 0)
991 {
992 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
993 BUFFER_delete(result);
994 result = NULL;
995 }
996 else
997 {
998 STRING_HANDLE unsub_trace = NULL;
999 if (trace_log != NULL)
1000 {
1001 unsub_trace = STRING_construct_sprintf(" | PACKET_ID: %"PRIu16, packetId);
1002 }
1003 /* Codes_SRS_MQTT_CODEC_07_028: [mqtt_codec_unsubscribe shall iterate through count items in the unsubscribeList.] */
1004 if (addListItemsToUnsubscribePacket(result, unsubscribeList, count, unsub_trace) != 0)
1005 {
1006 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1007 BUFFER_delete(result);
1008 result = NULL;
1009 }
1010 else
1011 {
1012 if (trace_log != NULL)
1013 {
1014 (void)STRING_copy(trace_log, "UNSUBSCRIBE");
1015 }
1016 if (constructFixedHeader(result, UNSUBSCRIBE_TYPE, UNSUBSCRIBE_FIXED_HEADER_FLAG) != 0)
1017 {
1018 /* Codes_SRS_MQTT_CODEC_07_029: [If any error is encountered then mqtt_codec_unsubscribe shall return NULL.] */
1019 BUFFER_delete(result);
1020 result = NULL;
1021 }
1022 else
1023 {
1024 if (trace_log != NULL)
1025 {
1026 (void)STRING_concat_with_STRING(trace_log, unsub_trace);
1027 }
1028 }
1029 }
1030 if (unsub_trace != NULL)
1031 {
1032 STRING_delete(unsub_trace);
1033 }
1034 }
1035 }
1036 }
1037 return result;
1038}
1039
1040int mqtt_codec_bytesReceived(MQTTCODEC_HANDLE handle, const unsigned char* buffer, size_t size)
1041{
1042 int result;
1043 MQTTCODEC_INSTANCE* codec_Data = (MQTTCODEC_INSTANCE*)handle;
1044 /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
1045 if (codec_Data == NULL)
1046 {
1047 result = MU_FAILURE;
1048 }
1049 /* Codes_SRS_MQTT_CODEC_07_031: [If the parameters handle or buffer is NULL then mqtt_codec_bytesReceived shall return a non-zero value.] */
1050 /* Codes_SRS_MQTT_CODEC_07_032: [If the parameters size is zero then mqtt_codec_bytesReceived shall return a non-zero value.] */
1051 else if (buffer == NULL || size == 0)
1052 {
1053 codec_Data->currPacket = PACKET_TYPE_ERROR;
1054 result = MU_FAILURE;
1055 }
1056 else
1057 {
1058 /* Codes_SRS_MQTT_CODEC_07_033: [mqtt_codec_bytesReceived constructs a sequence of bytes into the corresponding MQTT packets and on success returns zero.] */
1059 result = 0;
1060 size_t index = 0;
1061 for (index = 0; index < size && result == 0; index++)
1062 {
1063 uint8_t iterator = ((int8_t*)buffer)[index];
1064 if (codec_Data->codecState == CODEC_STATE_FIXED_HEADER)
1065 {
1066 if (codec_Data->currPacket == UNKNOWN_TYPE)
1067 {
1068 codec_Data->currPacket = processControlPacketType(iterator, &codec_Data->headerFlags);
1069 }
1070 else
1071 {
1072 if (prepareheaderDataInfo(codec_Data, iterator) != 0)
1073 {
1074 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1075 codec_Data->currPacket = PACKET_TYPE_ERROR;
1076 result = MU_FAILURE;
1077 }
1078 else if (codec_Data->currPacket == PINGRESP_TYPE)
1079 {
1080 /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
1081 completePacketData(codec_Data);
1082 }
1083 }
1084 }
1085 else if (codec_Data->codecState == CODEC_STATE_VAR_HEADER)
1086 {
1087 if (codec_Data->headerData == NULL)
1088 {
1089 codec_Data->codecState = CODEC_STATE_PAYLOAD;
1090 }
1091 else
1092 {
1093 uint8_t* dataBytes = BUFFER_u_char(codec_Data->headerData);
1094 if (dataBytes == NULL)
1095 {
1096 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1097 codec_Data->currPacket = PACKET_TYPE_ERROR;
1098 result = MU_FAILURE;
1099 }
1100 else
1101 {
1102 // Increment the data
1103 dataBytes += codec_Data->bufferOffset++;
1104 *dataBytes = iterator;
1105
1106 size_t totalLen = BUFFER_length(codec_Data->headerData);
1107 if (codec_Data->bufferOffset >= totalLen)
1108 {
1109 /* Codes_SRS_MQTT_CODEC_07_034: [Upon a constructing a complete MQTT packet mqtt_codec_bytesReceived shall call the ON_PACKET_COMPLETE_CALLBACK function.] */
1110 completePacketData(codec_Data);
1111 }
1112 }
1113 }
1114 }
1115 else
1116 {
1117 /* Codes_SRS_MQTT_CODEC_07_035: [If any error is encountered then the packet state will be marked as error and mqtt_codec_bytesReceived shall return a non-zero value.] */
1118 codec_Data->currPacket = PACKET_TYPE_ERROR;
1119 result = MU_FAILURE;
1120 }
1121 }
1122 }
1123 return result;
1124}
Note: See TracBrowser for help on using the repository browser.