source: rtos_arduino/trunk/arduino_lib/libraries/PubNub/PubNub.cpp@ 211

Last change on this file since 211 was 211, checked in by ertl-honda, 8 years ago

公開されているコードを追加

File size: 11.5 KB
Line 
1#if defined(__AVR)
2#include <avr/pgmspace.h>
3#else
4#define strncasecmp_P(a, b, c) strncasecmp(a, b, c)
5#endif
6
7#include <ctype.h>
8#include "PubNub.h"
9
10// #define PUBNUB_DEBUG 1
11
12#ifdef PUBNUB_DEBUG
13#define DBGprint(x...) Serial.print(x)
14#define DBGprintln(x...) Serial.println(x)
15#else
16#define DBGprint(x...)
17#define DBGprintln(x...)
18#endif
19
20/* There are some special considerations when using the WiFi libary,
21 * compared to the Ethernet library:
22 *
23 * (i) The client object may return stale data from previous connection,
24 * so we should call .flush() after initiating a connection.
25 *
26 * (ii) It appears .stop() does not block on really terminating
27 * the connection, do that manually.
28 *
29 * (iii) Data may still be available while connected() returns false
30 * already; use available() test on a lot of places where we used
31 * connected() before.
32 */
33
34class PubNub PubNub;
35
36bool PubNub::begin(const char *publish_key_, const char *subscribe_key_, const char *origin_)
37{
38 publish_key = publish_key_;
39 subscribe_key = subscribe_key_;
40 origin = origin_;
41 uuid = NULL;
42 auth = NULL;
43}
44
45void PubNub::set_uuid(const char *uuid_)
46{
47 uuid = uuid_;
48}
49
50void PubNub::set_auth(const char *auth_)
51{
52 auth = auth_;
53}
54
55PubNub_BASE_CLIENT *PubNub::publish(const char *channel, const char *message, int timeout)
56{
57 PubNub_BASE_CLIENT &client = publish_client;
58 unsigned long t_start;
59 int have_param = 0;
60
61retry:
62 t_start = millis();
63 /* connect() timeout is about 30s, much lower than our usual
64 * timeout is. */
65 if (!client.connect(origin, 80)) {
66 DBGprintln("Connection error");
67 client.stop();
68 return NULL;
69 }
70
71 client.flush();
72 client.print("GET /publish/");
73 client.print(publish_key);
74 client.print("/");
75 client.print(subscribe_key);
76 client.print("/0/");
77 client.print(channel);
78 client.print("/0/");
79
80 /* Inject message, URI-escaping it in the process.
81 * We are careful to save RAM by not using any copies
82 * of the string or explicit buffers. */
83 const char *pmessage = message;
84 while (pmessage[0]) {
85 /* RFC 3986 Unreserved characters plus few
86 * safe reserved ones. */
87 size_t okspan = strspn(pmessage, "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_.~" ",=:;@[]");
88 if (okspan > 0) {
89 client.write((const uint8_t *) pmessage, okspan);
90 pmessage += okspan;
91 }
92 if (pmessage[0]) {
93 /* %-encode a non-ok character. */
94 char enc[3] = {'%'};
95 enc[1] = "0123456789ABCDEF"[pmessage[0] / 16];
96 enc[2] = "0123456789ABCDEF"[pmessage[0] % 16];
97 client.write((const uint8_t *) enc, 3);
98 pmessage++;
99 }
100 }
101
102 if (auth) {
103 client.print(have_param ? '&' : '?');
104 client.print("auth=");
105 client.print(auth);
106 have_param = 1;
107 }
108
109 enum PubNub_BH ret = this->_request_bh(client, t_start, timeout, have_param ? '&' : '?');
110 switch (ret) {
111 case PubNub_BH_OK:
112 /* Success and reached body, return handle to the client
113 * for further perusal. */
114 return &client;
115 case PubNub_BH_ERROR:
116 /* Failure. */
117 client.stop();
118 while (client.connected()) ;
119 return NULL;
120 case PubNub_BH_TIMEOUT:
121 /* Time out. Try again. */
122 client.stop();
123 while (client.connected()) ;
124 goto retry;
125 }
126}
127
128PubSubClient *PubNub::subscribe(const char *channel, int timeout)
129{
130 PubSubClient &client = subscribe_client;
131 unsigned long t_start;
132 int have_param = 0;
133
134retry:
135 t_start = millis();
136 /* connect() timeout is about 30s, much lower than our usual
137 * timeout is. */
138 if (!client.connect(origin, 80)) {
139 DBGprintln("Connection error");
140 client.stop();
141 return NULL;
142 }
143
144 client.flush();
145 client.print("GET /subscribe/");
146 client.print(subscribe_key);
147 client.print("/");
148 client.print(channel);
149 client.print("/0/");
150 client.print(client.server_timetoken());
151 if (uuid) {
152 client.print("?uuid=");
153 client.print(uuid);
154 have_param = 1;
155 }
156 if (auth) {
157 client.print(have_param ? '&' : '?');
158 client.print("auth=");
159 client.print(auth);
160 have_param = 1;
161 }
162
163 enum PubNub_BH ret = this->_request_bh(client, t_start, timeout, have_param ? '&' : '?');
164 switch (ret) {
165 case PubNub_BH_OK:
166 /* Success and reached body. We need to eat '[' first,
167 * as our API contract is to return only the "message body"
168 * part of reply from subscribe. */
169 if (!client.wait_for_data()
170 || client.read() != '[') {
171 /* Something unexpected. */
172 DBGprintln("Unexpected body in subscribe");
173 client.stop();
174 while (client.connected()) ;
175 return NULL;
176 }
177 /* Now return handle to the client for further perusal.
178 * PubSubClient class will make sure that the client does
179 * not see the time token but we stop right after the
180 * message body. */
181 client.start_body();
182 return &client;
183
184 case PubNub_BH_ERROR:
185 /* Failure. */
186 client.stop();
187 while (client.connected()) ;
188 return NULL;
189
190 case PubNub_BH_TIMEOUT:
191 /* Time out. Try again. */
192 client.stop();
193 while (client.connected()) ;
194 goto retry;
195 }
196}
197
198PubNub_BASE_CLIENT *PubNub::history(const char *channel, int limit, int timeout)
199{
200 PubNub_BASE_CLIENT &client = history_client;
201 unsigned long t_start;
202
203retry:
204 t_start = millis();
205 if (!client.connect(origin, 80)) {
206 DBGprintln("Connection error");
207 client.stop();
208 return NULL;
209 }
210
211 client.flush();
212 client.print("GET /history/");
213 client.print(subscribe_key);
214 client.print("/");
215 client.print(channel);
216 client.print("/0/");
217 client.print(limit, DEC);
218
219 enum PubNub_BH ret = this->_request_bh(client, t_start, timeout, '?');
220 switch (ret) {
221 case PubNub_BH_OK:
222 /* Success and reached body, return handle to the client
223 * for further perusal. */
224 return &client;
225 case PubNub_BH_ERROR:
226 /* Failure. */
227 client.stop();
228 while (client.connected()) ;
229 return NULL;
230 case PubNub_BH_TIMEOUT:
231 /* Time out. Try again. */
232 client.stop();
233 while (client.connected()) ;
234 goto retry;
235 }
236}
237
238enum PubNub_BH PubNub::_request_bh(PubNub_BASE_CLIENT &client, unsigned long t_start, int timeout, char qparsep)
239{
240 /* Finish the first line of the request. */
241 client.print(qparsep);
242 client.print("pnsdk=PubNub-Arduino/1.0 HTTP/1.1\r\n");
243 /* Finish HTTP request. */
244 client.print("Host: ");
245 client.print(origin);
246 client.print("\r\nUser-Agent: PubNub-Arduino/1.0\r\nConnection: close\r\n\r\n");
247
248#define WAIT() do { \
249 while (!client.available()) { \
250 /* wait, just check for timeout */ \
251 if (millis() - t_start > (unsigned long) timeout * 1000) { \
252 DBGprintln("Timeout in bottom half"); \
253 return PubNub_BH_TIMEOUT; \
254 } \
255 if (!client.connected()) { \
256 /* Oops, connection interrupted. */ \
257 DBGprintln("Connection reset in bottom half"); \
258 return PubNub_BH_ERROR; \
259 } \
260 } \
261} while (0)
262
263 /* Read first line with HTTP code. */
264 /* "HTTP/1.x " */
265 do {
266 WAIT();
267 } while (client.read() != ' ');
268 /* Now, first digit of HTTP code. */
269 WAIT();
270 char c = client.read();
271 if (c != '2') {
272 /* HTTP code that is NOT 2xx means trouble.
273 * kthxbai */
274 DBGprint("Wrong HTTP status first digit ");
275 DBGprint((int) c, DEC);
276 DBGprintln(" in bottom half");
277 return PubNub_BH_ERROR;
278 }
279
280 /* Now, we enter in a state machine that shall guide us through
281 * the remaining headers to the beginning of the body. */
282 enum {
283 RS_SKIPLINE, /* Skip the rest of this line. */
284 RS_LOADLINE, /* Try loading the line in a buffer. */
285 } request_state = RS_SKIPLINE; /* Skip the rest of status line first. */
286 bool chunked = false;
287
288 while (client.connected() || client.available()) {
289 /* Let's hope there is no stray LF without CR. */
290 if (request_state == RS_SKIPLINE) {
291 do {
292 WAIT();
293 } while (client.read() != '\n');
294 request_state = RS_LOADLINE;
295
296 } else { /* request_state == RS_LOADLINE */
297 /* line[] must be enough to hold
298 * Transfer-Encoding: chunked (or \r\n) */
299 const static char PROGMEM chunked_str[] = "Transfer-Encoding: chunked\r\n";
300 char line[sizeof(chunked_str)]; /* Not NUL-terminated! */
301 int linelen = 0;
302 char ch = 0;
303 do {
304 WAIT();
305 ch = client.read();
306 line[linelen++] = ch;
307 if (linelen == strlen_P(chunked_str)
308 && !strncasecmp_P(line, chunked_str, linelen)) {
309 /* Chunked encoding header. */
310 chunked = true;
311 break;
312 }
313 } while (ch != '\n' && linelen < sizeof(line));
314 if (ch != '\n') {
315 /* We are not at the end of the line yet.
316 * Skip the rest of the line. */
317 request_state = RS_SKIPLINE;
318 } else if (linelen == 2 && line[0] == '\r') {
319 /* Empty line. This means headers end. */
320 break;
321 }
322 }
323 }
324
325 if (chunked) {
326 /* There is one extra line due to Transfer-encoding: chunked.
327 * Our minimalistic support means that we hope for just
328 * a single chunk, just skip the first line after header. */
329 do {
330 WAIT();
331 } while (client.read() != '\n');
332 }
333
334 /* Body begins now. */
335 return PubNub_BH_OK;
336}
337
338
339int PubSubClient::read()
340{
341 int c = PubNub_BASE_CLIENT::read();
342 if (!json_enabled || c == -1)
343 return c;
344
345 this->_state_input(c, NULL, 0);
346 return c;
347}
348
349int PubSubClient::read(uint8_t *buf, size_t size)
350{
351 int len = PubNub_BASE_CLIENT::read(buf, size);
352 if (!json_enabled || len <= 0)
353 return len;
354 for (int i = 0; i < len; i++) {
355 this->_state_input(buf[i], &buf[i+1], len - i - 1);
356 if (!available() && !connected()) {
357 /* We have hit the end somewhere in this buffer.
358 * From user perspective, only characters up to
359 * index i are valid. */
360 return i + 1;
361 }
362 }
363 return len;
364}
365
366bool PubSubClient::wait_for_data(int timeout)
367{
368 unsigned long t_start = millis();
369 while (connected() && !available()) {
370 if (millis() - t_start > (unsigned long) timeout * 1000)
371 return false; /* Time out. */
372 }
373 return available();
374}
375
376void PubSubClient::stop()
377{
378 if ((!available() && !connected()) || !json_enabled) {
379 PubNub_BASE_CLIENT::stop();
380 return;
381 }
382 /* We are still connected. Read the rest of the stream so that
383 * we catch the timetoken. */
384 while (wait_for_data()) {
385 char ch = read();
386 this->_state_input(ch, NULL, 0);
387 }
388 json_enabled = false;
389}
390
391void PubSubClient::start_body()
392{
393 json_enabled = true;
394 in_string = after_backslash = false;
395 braces_depth = 0;
396}
397
398void PubSubClient::_state_input(uint8_t ch, uint8_t *nextbuf, size_t nextsize)
399{
400 /* Process a single character on input, updating the JSON
401 * state machine. If we reached the last character of input
402 * (just before expected ","), we will eat the rest of the body,
403 * update timetoken and close the connection. */
404 if (in_string) {
405 if (after_backslash) {
406 /* Whatever this is... */
407 after_backslash = false;
408 return;
409 }
410 switch (ch) {
411 case '"':
412 in_string = false;
413 if (braces_depth == 0)
414 goto body_end;
415 return;
416 case '\\':
417 after_backslash = true;
418 return;
419 default:
420 return;
421 }
422
423 } else {
424 switch (ch) {
425 case '"':
426 in_string = true;
427 return;
428 case '{':
429 case '[':
430 braces_depth++;
431 return;
432 case '}':
433 case ']':
434 braces_depth--;
435 if (braces_depth <= 0)
436 goto body_end;
437 return;
438 default:
439 return;
440 }
441 }
442
443 return;
444body_end:
445 /* End of data here. */
446 this->_grab_timetoken(nextbuf, nextsize);
447}
448
449void PubSubClient::_grab_timetoken(uint8_t *nextbuf, size_t nextsize)
450{
451 char new_timetoken[22];
452 size_t new_timetoken_len = 0;
453
454 /* Expected followup now is:
455 * ,"13511688131075270"]
456 */
457 /* Somewhat unwieldy macros that will first exhaust nextbuf[],
458 * then read directly from socket. */
459#define WAIT() do { \
460 if (nextsize > 0) \
461 break; \
462 if (!wait_for_data()) { \
463 /* Oops, connection interrupted. */ \
464 return; \
465 } \
466} while (0)
467#define GETCHAR() (nextsize > 0 ? (nextsize--, *nextbuf++) : read())
468 do { WAIT(); } while (GETCHAR() != ',');
469 do { WAIT(); } while (GETCHAR() != '"');
470 do {
471 WAIT();
472 char ch = GETCHAR();
473 if (ch == '"')
474 break;
475 new_timetoken[new_timetoken_len++] = ch;
476 if (new_timetoken_len >= sizeof(new_timetoken) - 1)
477 break;
478 } while (1);
479 memcpy(timetoken, new_timetoken, new_timetoken_len);
480 timetoken[new_timetoken_len] = 0;
481}
Note: See TracBrowser for help on using the repository browser.