source: rtos_arduino/trunk/arduino_lib/libraries/pubsubclient-2.6/src/PubSubClient.cpp@ 209

Last change on this file since 209 was 209, checked in by ertl-honda, 8 years ago

BlueMix用のフィアルを追加

File size: 16.9 KB
Line 
1/*
2 PubSubClient.cpp - A simple client for MQTT.
3 Nick O'Leary
4 http://knolleary.net
5*/
6
7#include "PubSubClient.h"
8#include "Arduino.h"
9
10PubSubClient::PubSubClient() {
11 this->_state = MQTT_DISCONNECTED;
12 this->_client = NULL;
13 this->stream = NULL;
14 setCallback(NULL);
15}
16
17PubSubClient::PubSubClient(Client& client) {
18 this->_state = MQTT_DISCONNECTED;
19 setClient(client);
20 this->stream = NULL;
21}
22
23PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) {
24 this->_state = MQTT_DISCONNECTED;
25 setServer(addr, port);
26 setClient(client);
27 this->stream = NULL;
28}
29PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) {
30 this->_state = MQTT_DISCONNECTED;
31 setServer(addr,port);
32 setClient(client);
33 setStream(stream);
34}
35PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
36 this->_state = MQTT_DISCONNECTED;
37 setServer(addr, port);
38 setCallback(callback);
39 setClient(client);
40 this->stream = NULL;
41}
42PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
43 this->_state = MQTT_DISCONNECTED;
44 setServer(addr,port);
45 setCallback(callback);
46 setClient(client);
47 setStream(stream);
48}
49
50PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) {
51 this->_state = MQTT_DISCONNECTED;
52 setServer(ip, port);
53 setClient(client);
54 this->stream = NULL;
55}
56PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) {
57 this->_state = MQTT_DISCONNECTED;
58 setServer(ip,port);
59 setClient(client);
60 setStream(stream);
61}
62PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
63 this->_state = MQTT_DISCONNECTED;
64 setServer(ip, port);
65 setCallback(callback);
66 setClient(client);
67 this->stream = NULL;
68}
69PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
70 this->_state = MQTT_DISCONNECTED;
71 setServer(ip,port);
72 setCallback(callback);
73 setClient(client);
74 setStream(stream);
75}
76
77PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) {
78 this->_state = MQTT_DISCONNECTED;
79 setServer(domain,port);
80 setClient(client);
81 this->stream = NULL;
82}
83PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) {
84 this->_state = MQTT_DISCONNECTED;
85 setServer(domain,port);
86 setClient(client);
87 setStream(stream);
88}
89PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) {
90 this->_state = MQTT_DISCONNECTED;
91 setServer(domain,port);
92 setCallback(callback);
93 setClient(client);
94 this->stream = NULL;
95}
96PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) {
97 this->_state = MQTT_DISCONNECTED;
98 setServer(domain,port);
99 setCallback(callback);
100 setClient(client);
101 setStream(stream);
102}
103
104boolean PubSubClient::connect(const char *id) {
105 return connect(id,NULL,NULL,0,0,0,0);
106}
107
108boolean PubSubClient::connect(const char *id, const char *user, const char *pass) {
109 return connect(id,user,pass,0,0,0,0);
110}
111
112boolean PubSubClient::connect(const char *id, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
113 return connect(id,NULL,NULL,willTopic,willQos,willRetain,willMessage);
114}
115
116boolean PubSubClient::connect(const char *id, const char *user, const char *pass, const char* willTopic, uint8_t willQos, boolean willRetain, const char* willMessage) {
117 if (!connected()) {
118 int result = 0;
119
120 if (domain != NULL) {
121 result = _client->connect(this->domain, this->port);
122 } else {
123 result = _client->connect(this->ip, this->port);
124 }
125 if (result == 1) {
126 nextMsgId = 1;
127 // Leave room in the buffer for header and variable length field
128 uint16_t length = 5;
129 unsigned int j;
130
131#if MQTT_VERSION == MQTT_VERSION_3_1
132 uint8_t d[9] = {0x00,0x06,'M','Q','I','s','d','p', MQTT_VERSION};
133#define MQTT_HEADER_VERSION_LENGTH 9
134#elif MQTT_VERSION == MQTT_VERSION_3_1_1
135 uint8_t d[7] = {0x00,0x04,'M','Q','T','T',MQTT_VERSION};
136#define MQTT_HEADER_VERSION_LENGTH 7
137#endif
138 for (j = 0;j<MQTT_HEADER_VERSION_LENGTH;j++) {
139 buffer[length++] = d[j];
140 }
141
142 uint8_t v;
143 if (willTopic) {
144 v = 0x06|(willQos<<3)|(willRetain<<5);
145 } else {
146 v = 0x02;
147 }
148
149 if(user != NULL) {
150 v = v|0x80;
151
152 if(pass != NULL) {
153 v = v|(0x80>>1);
154 }
155 }
156
157 buffer[length++] = v;
158
159 buffer[length++] = ((MQTT_KEEPALIVE) >> 8);
160 buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF);
161 length = writeString(id,buffer,length);
162 if (willTopic) {
163 length = writeString(willTopic,buffer,length);
164 length = writeString(willMessage,buffer,length);
165 }
166
167 if(user != NULL) {
168 length = writeString(user,buffer,length);
169 if(pass != NULL) {
170 length = writeString(pass,buffer,length);
171 }
172 }
173
174 write(MQTTCONNECT,buffer,length-5);
175
176 lastInActivity = lastOutActivity = millis();
177
178 while (!_client->available()) {
179 unsigned long t = millis();
180 if (t-lastInActivity >= ((int32_t) MQTT_SOCKET_TIMEOUT*1000UL)) {
181 _state = MQTT_CONNECTION_TIMEOUT;
182 _client->stop();
183 return false;
184 }
185 }
186 uint8_t llen;
187 uint16_t len = readPacket(&llen);
188
189 if (len == 4) {
190 if (buffer[3] == 0) {
191 lastInActivity = millis();
192 pingOutstanding = false;
193 _state = MQTT_CONNECTED;
194 return true;
195 } else {
196 _state = buffer[3];
197 }
198 }
199 _client->stop();
200 } else {
201 _state = MQTT_CONNECT_FAILED;
202 }
203 return false;
204 }
205 return true;
206}
207
208// reads a byte into result
209boolean PubSubClient::readByte(uint8_t * result) {
210 uint32_t previousMillis = millis();
211 while(!_client->available()) {
212 uint32_t currentMillis = millis();
213 if(currentMillis - previousMillis >= ((int32_t) MQTT_SOCKET_TIMEOUT * 1000)){
214 return false;
215 }
216 }
217 *result = _client->read();
218 return true;
219}
220
221// reads a byte into result[*index] and increments index
222boolean PubSubClient::readByte(uint8_t * result, uint16_t * index){
223 uint16_t current_index = *index;
224 uint8_t * write_address = &(result[current_index]);
225 if(readByte(write_address)){
226 *index = current_index + 1;
227 return true;
228 }
229 return false;
230}
231
232uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
233 uint16_t len = 0;
234 if(!readByte(buffer, &len)) return 0;
235 bool isPublish = (buffer[0]&0xF0) == MQTTPUBLISH;
236 uint32_t multiplier = 1;
237 uint16_t length = 0;
238 uint8_t digit = 0;
239 uint16_t skip = 0;
240 uint8_t start = 0;
241
242 do {
243 if(!readByte(&digit)) return 0;
244 buffer[len++] = digit;
245 length += (digit & 127) * multiplier;
246 multiplier *= 128;
247 } while ((digit & 128) != 0);
248 *lengthLength = len-1;
249
250 if (isPublish) {
251 // Read in topic length to calculate bytes to skip over for Stream writing
252 if(!readByte(buffer, &len)) return 0;
253 if(!readByte(buffer, &len)) return 0;
254 skip = (buffer[*lengthLength+1]<<8)+buffer[*lengthLength+2];
255 start = 2;
256 if (buffer[0]&MQTTQOS1) {
257 // skip message id
258 skip += 2;
259 }
260 }
261
262 for (uint16_t i = start;i<length;i++) {
263 if(!readByte(&digit)) return 0;
264 if (this->stream) {
265 if (isPublish && len-*lengthLength-2>skip) {
266 this->stream->write(digit);
267 }
268 }
269 if (len < MQTT_MAX_PACKET_SIZE) {
270 buffer[len] = digit;
271 }
272 len++;
273 }
274
275 if (!this->stream && len > MQTT_MAX_PACKET_SIZE) {
276 len = 0; // This will cause the packet to be ignored.
277 }
278
279 return len;
280}
281
282boolean PubSubClient::loop() {
283 if (connected()) {
284 unsigned long t = millis();
285 if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) {
286 if (pingOutstanding) {
287 this->_state = MQTT_CONNECTION_TIMEOUT;
288 _client->stop();
289 return false;
290 } else {
291 buffer[0] = MQTTPINGREQ;
292 buffer[1] = 0;
293 _client->write(buffer,2);
294 lastOutActivity = t;
295 lastInActivity = t;
296 pingOutstanding = true;
297 }
298 }
299 if (_client->available()) {
300 uint8_t llen;
301 uint16_t len = readPacket(&llen);
302 uint16_t msgId = 0;
303 uint8_t *payload;
304 if (len > 0) {
305 lastInActivity = t;
306 uint8_t type = buffer[0]&0xF0;
307 if (type == MQTTPUBLISH) {
308 if (callback) {
309 uint16_t tl = (buffer[llen+1]<<8)+buffer[llen+2];
310 char topic[tl+1];
311 for (uint16_t i=0;i<tl;i++) {
312 topic[i] = buffer[llen+3+i];
313 }
314 topic[tl] = 0;
315 // msgId only present for QOS>0
316 if ((buffer[0]&0x06) == MQTTQOS1) {
317 msgId = (buffer[llen+3+tl]<<8)+buffer[llen+3+tl+1];
318 payload = buffer+llen+3+tl+2;
319 callback(topic,payload,len-llen-3-tl-2);
320
321 buffer[0] = MQTTPUBACK;
322 buffer[1] = 2;
323 buffer[2] = (msgId >> 8);
324 buffer[3] = (msgId & 0xFF);
325 _client->write(buffer,4);
326 lastOutActivity = t;
327
328 } else {
329 payload = buffer+llen+3+tl;
330 callback(topic,payload,len-llen-3-tl);
331 }
332 }
333 } else if (type == MQTTPINGREQ) {
334 buffer[0] = MQTTPINGRESP;
335 buffer[1] = 0;
336 _client->write(buffer,2);
337 } else if (type == MQTTPINGRESP) {
338 pingOutstanding = false;
339 }
340 }
341 }
342 return true;
343 }
344 return false;
345}
346
347boolean PubSubClient::publish(const char* topic, const char* payload) {
348 return publish(topic,(const uint8_t*)payload,strlen(payload),false);
349}
350
351boolean PubSubClient::publish(const char* topic, const char* payload, boolean retained) {
352 return publish(topic,(const uint8_t*)payload,strlen(payload),retained);
353}
354
355boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength) {
356 return publish(topic, payload, plength, false);
357}
358
359boolean PubSubClient::publish(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
360 if (connected()) {
361 if (MQTT_MAX_PACKET_SIZE < 5 + 2+strlen(topic) + plength) {
362 // Too long
363 return false;
364 }
365 // Leave room in the buffer for header and variable length field
366 uint16_t length = 5;
367 length = writeString(topic,buffer,length);
368 uint16_t i;
369 for (i=0;i<plength;i++) {
370 buffer[length++] = payload[i];
371 }
372 uint8_t header = MQTTPUBLISH;
373 if (retained) {
374 header |= 1;
375 }
376 return write(header,buffer,length-5);
377 }
378 return false;
379}
380
381boolean PubSubClient::publish_P(const char* topic, const uint8_t* payload, unsigned int plength, boolean retained) {
382 uint8_t llen = 0;
383 uint8_t digit;
384 unsigned int rc = 0;
385 uint16_t tlen;
386 unsigned int pos = 0;
387 unsigned int i;
388 uint8_t header;
389 unsigned int len;
390
391 if (!connected()) {
392 return false;
393 }
394
395 tlen = strlen(topic);
396
397 header = MQTTPUBLISH;
398 if (retained) {
399 header |= 1;
400 }
401 buffer[pos++] = header;
402 len = plength + 2 + tlen;
403 do {
404 digit = len % 128;
405 len = len / 128;
406 if (len > 0) {
407 digit |= 0x80;
408 }
409 buffer[pos++] = digit;
410 llen++;
411 } while(len>0);
412
413 pos = writeString(topic,buffer,pos);
414
415 rc += _client->write(buffer,pos);
416
417 for (i=0;i<plength;i++) {
418 rc += _client->write((char)pgm_read_byte_near(payload + i));
419 }
420
421 lastOutActivity = millis();
422
423 return rc == tlen + 4 + plength;
424}
425
426boolean PubSubClient::write(uint8_t header, uint8_t* buf, uint16_t length) {
427 uint8_t lenBuf[4];
428 uint8_t llen = 0;
429 uint8_t digit;
430 uint8_t pos = 0;
431 uint16_t rc;
432 uint16_t len = length;
433 do {
434 digit = len % 128;
435 len = len / 128;
436 if (len > 0) {
437 digit |= 0x80;
438 }
439 lenBuf[pos++] = digit;
440 llen++;
441 } while(len>0);
442
443 buf[4-llen] = header;
444 for (int i=0;i<llen;i++) {
445 buf[5-llen+i] = lenBuf[i];
446 }
447
448#ifdef MQTT_MAX_TRANSFER_SIZE
449 uint8_t* writeBuf = buf+(4-llen);
450 uint16_t bytesRemaining = length+1+llen; //Match the length type
451 uint8_t bytesToWrite;
452 boolean result = true;
453 while((bytesRemaining > 0) && result) {
454 bytesToWrite = (bytesRemaining > MQTT_MAX_TRANSFER_SIZE)?MQTT_MAX_TRANSFER_SIZE:bytesRemaining;
455 rc = _client->write(writeBuf,bytesToWrite);
456 result = (rc == bytesToWrite);
457 bytesRemaining -= rc;
458 writeBuf += rc;
459 }
460 return result;
461#else
462 rc = _client->write(buf+(4-llen),length+1+llen);
463 lastOutActivity = millis();
464 return (rc == 1+llen+length);
465#endif
466}
467
468boolean PubSubClient::subscribe(const char* topic) {
469 return subscribe(topic, 0);
470}
471
472boolean PubSubClient::subscribe(const char* topic, uint8_t qos) {
473 if (qos < 0 || qos > 1) {
474 return false;
475 }
476 if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
477 // Too long
478 return false;
479 }
480 if (connected()) {
481 // Leave room in the buffer for header and variable length field
482 uint16_t length = 5;
483 nextMsgId++;
484 if (nextMsgId == 0) {
485 nextMsgId = 1;
486 }
487 buffer[length++] = (nextMsgId >> 8);
488 buffer[length++] = (nextMsgId & 0xFF);
489 length = writeString((char*)topic, buffer,length);
490 buffer[length++] = qos;
491 return write(MQTTSUBSCRIBE|MQTTQOS1,buffer,length-5);
492 }
493 return false;
494}
495
496boolean PubSubClient::unsubscribe(const char* topic) {
497 if (MQTT_MAX_PACKET_SIZE < 9 + strlen(topic)) {
498 // Too long
499 return false;
500 }
501 if (connected()) {
502 uint16_t length = 5;
503 nextMsgId++;
504 if (nextMsgId == 0) {
505 nextMsgId = 1;
506 }
507 buffer[length++] = (nextMsgId >> 8);
508 buffer[length++] = (nextMsgId & 0xFF);
509 length = writeString(topic, buffer,length);
510 return write(MQTTUNSUBSCRIBE|MQTTQOS1,buffer,length-5);
511 }
512 return false;
513}
514
515void PubSubClient::disconnect() {
516 buffer[0] = MQTTDISCONNECT;
517 buffer[1] = 0;
518 _client->write(buffer,2);
519 _state = MQTT_DISCONNECTED;
520 _client->stop();
521 lastInActivity = lastOutActivity = millis();
522}
523
524uint16_t PubSubClient::writeString(const char* string, uint8_t* buf, uint16_t pos) {
525 const char* idp = string;
526 uint16_t i = 0;
527 pos += 2;
528 while (*idp) {
529 buf[pos++] = *idp++;
530 i++;
531 }
532 buf[pos-i-2] = (i >> 8);
533 buf[pos-i-1] = (i & 0xFF);
534 return pos;
535}
536
537
538boolean PubSubClient::connected() {
539 boolean rc;
540 if (_client == NULL ) {
541 rc = false;
542 } else {
543 rc = (int)_client->connected();
544 if (!rc) {
545 if (this->_state == MQTT_CONNECTED) {
546 this->_state = MQTT_CONNECTION_LOST;
547 _client->flush();
548 _client->stop();
549 }
550 }
551 }
552 return rc;
553}
554
555PubSubClient& PubSubClient::setServer(uint8_t * ip, uint16_t port) {
556 IPAddress addr(ip[0],ip[1],ip[2],ip[3]);
557 return setServer(addr,port);
558}
559
560PubSubClient& PubSubClient::setServer(IPAddress ip, uint16_t port) {
561 this->ip = ip;
562 this->port = port;
563 this->domain = NULL;
564 return *this;
565}
566
567PubSubClient& PubSubClient::setServer(const char * domain, uint16_t port) {
568 this->domain = domain;
569 this->port = port;
570 return *this;
571}
572
573PubSubClient& PubSubClient::setCallback(MQTT_CALLBACK_SIGNATURE) {
574 this->callback = callback;
575 return *this;
576}
577
578PubSubClient& PubSubClient::setClient(Client& client){
579 this->_client = &client;
580 return *this;
581}
582
583PubSubClient& PubSubClient::setStream(Stream& stream){
584 this->stream = &stream;
585 return *this;
586}
587
588int PubSubClient::state() {
589 return this->_state;
590}
Note: See TracBrowser for help on using the repository browser.