62161e79fc291a9025a945082fa7bfac0148a72e
1 /**
2 * collectd - src/ceph.c
3 * Copyright (C) 2011 New Dream Network
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
8 *
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * Authors:
19 * Colin McCabe <cmccabe@alumni.cmu.edu>
20 * Dennis Zou <yunzou@cisco.com>
21 * Dan Ryder <daryder@cisco.com>
22 **/
24 #define _BSD_SOURCE
26 #include "collectd.h"
27 #include "common.h"
28 #include "plugin.h"
30 #include <arpa/inet.h>
31 #include <errno.h>
32 #include <fcntl.h>
33 #include <json/json.h>
34 #include <json/json_object_private.h> /* need for struct json_object_iter */
35 #include <limits.h>
36 #include <poll.h>
37 #include <stdint.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <string.h>
41 #include <strings.h>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 #include <sys/types.h>
45 #include <sys/un.h>
46 #include <unistd.h>
47 #define MAX_RRD_DS_NAME_LEN 20
49 #define RETRY_ON_EINTR(ret, expr) \
50 while(1) { \
51 ret = expr; \
52 if (ret >= 0) \
53 break; \
54 ret = -errno; \
55 if (ret != -EINTR) \
56 break; \
57 }
59 /** Timeout interval in seconds */
60 #define CEPH_TIMEOUT_INTERVAL 1
62 /** Maximum path length for a UNIX domain socket on this system */
63 #define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
65 /******* ceph_daemon *******/
66 struct ceph_daemon
67 {
68 /** Version of the admin_socket interface */
69 uint32_t version;
70 /** daemon name **/
71 char name[DATA_MAX_NAME_LEN];
73 int dset_num;
75 /** Path to the socket that we use to talk to the ceph daemon */
76 char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
78 /** The set of key/value pairs that this daemon reports
79 * dset.type The daemon name
80 * dset.ds_num Number of data sources (key/value pairs)
81 * dset.ds Dynamically allocated array of key/value pairs
82 */
83 //struct data_set_s dset;
84 /** Dynamically allocated array **/
85 struct data_set_s *dset;
86 int **pc_types;
87 };
89 enum perfcounter_type_d
90 {
91 PERFCOUNTER_LATENCY = 0x4, PERFCOUNTER_DERIVE = 0x8,
92 };
94 /** Array of daemons to monitor */
95 static struct ceph_daemon **g_daemons = NULL;
97 /** Number of elements in g_daemons */
98 static int g_num_daemons = 0;
100 static void ceph_daemon_print(const struct ceph_daemon *d)
101 {
102 DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
103 }
105 static void ceph_daemons_print(void)
106 {
107 int i;
108 for (i = 0; i < g_num_daemons; ++i)
109 {
110 ceph_daemon_print(g_daemons[i]);
111 }
112 }
114 struct last_data **last_poll_data = NULL;
115 int last_idx = 0;
117 /*static void ceph_daemon_free(struct ceph_daemon *d)
118 {
119 plugin_unregister_data_set(d->dset.type);
120 sfree(d->dset.ds);
121 sfree(d);
122 }*/
123 static void ceph_daemon_free(struct ceph_daemon *d)
124 {
125 int i = 0;
126 for (; i < d->dset_num; i++)
127 {
128 plugin_unregister_data_set((d->dset + i)->type);
129 sfree(d->dset->ds);
130 sfree(d->pc_types[i]);
131 }
132 sfree(d->dset);
133 sfree(d->pc_types);
134 sfree(d);
135 }
137 static void compact_ds_name(char *source, char *dest)
138 {
139 int keys_num = 0, i;
140 char *save_ptr = NULL, *tmp_ptr = source;
141 char *keys[16];
142 char len_str[3];
143 char tmp[DATA_MAX_NAME_LEN];
144 int reserved = 0;
145 int offset = 0;
146 memset(tmp, 0, sizeof(tmp));
147 if (source == NULL || dest == NULL || source[0] == '\0' || dest[0] != '\0')
148 {
149 return;
150 }
151 size_t src_len = strlen(source);
152 snprintf(len_str, sizeof(len_str), "%zu", src_len);
153 unsigned char append_status = 0x0;
154 append_status |= (source[src_len - 1] == '-') ? 0x1 : 0x0;
155 append_status |= (source[src_len - 1] == '+') ? 0x2 : 0x0;
156 while ((keys[keys_num] = strtok_r(tmp_ptr, ":_-+", &save_ptr)) != NULL)
157 {
158 tmp_ptr = NULL;
159 /** capitalize 1st char **/
160 keys[keys_num][0] = toupper(keys[keys_num][0]);
161 keys_num++;
162 if (keys_num >= 16)
163 break;
164 }
165 /** concatenate each part of source string **/
166 for (i = 0; i < keys_num; i++)
167 {
168 strcat(tmp, keys[i]);
169 }
170 tmp[DATA_MAX_NAME_LEN - 1] = '\0';
171 /** to coordinate limitation of length of ds name from RRD
172 * we will truncate ds_name
173 * when the its length is more than
174 * MAX_RRD_DS_NAME_LEN
175 */
176 if (strlen(tmp) > MAX_RRD_DS_NAME_LEN - 1)
177 {
178 append_status |= 0x4;
179 /** we should reserve space for
180 * len_str
181 */
182 reserved += 2;
183 }
184 if (append_status & 0x1)
185 {
186 /** we should reserve space for
187 * "Minus"
188 */
189 reserved += 5;
190 }
191 if (append_status & 0x2)
192 {
193 /** we should reserve space for
194 * "Plus"
195 */
196 reserved += 4;
197 }
198 snprintf(dest, MAX_RRD_DS_NAME_LEN - reserved, "%s", tmp);
199 offset = strlen(dest);
200 switch (append_status)
201 {
202 case 0x1:
203 memcpy(dest + offset, "Minus", 5);
204 break;
205 case 0x2:
206 memcpy(dest + offset, "Plus", 5);
207 break;
208 case 0x4:
209 memcpy(dest + offset, len_str, 2);
210 break;
211 case 0x5:
212 memcpy(dest + offset, "Minus", 5);
213 memcpy(dest + offset + 5, len_str, 2);
214 break;
215 case 0x6:
216 memcpy(dest + offset, "Plus", 4);
217 memcpy(dest + offset + 4, len_str, 2);
218 break;
219 default:
220 break;
221 }
222 }
223 static int parse_keys(const char *key_str, char *dset_name, char *ds_name)
224 {
225 char *ptr, *rptr;
226 size_t dset_name_len = 0;
227 size_t ds_name_len = 0;
228 char tmp_ds_name[DATA_MAX_NAME_LEN];
229 memset(tmp_ds_name, 0, sizeof(tmp_ds_name));
230 if (dset_name == NULL || ds_name == NULL || key_str == NULL
231 || key_str[0] == '\0' || dset_name[0] != '\0' || ds_name[0] != '\0')
232 {
233 return -1;
234 }
235 if ((ptr = strchr(key_str, '.')) == NULL
236 || (rptr = strrchr(key_str, '.')) == NULL)
237 {
238 strncpy(dset_name, key_str, DATA_MAX_NAME_LEN - 1);
239 strncpy(tmp_ds_name, key_str, DATA_MAX_NAME_LEN - 1);
240 goto compact;
241 }
242 dset_name_len =
243 (ptr - key_str) > (DATA_MAX_NAME_LEN - 1) ?
244 (DATA_MAX_NAME_LEN - 1) : (ptr - key_str);
245 memcpy(dset_name, key_str, dset_name_len);
246 ds_name_len =
247 (rptr - ptr) > DATA_MAX_NAME_LEN ? DATA_MAX_NAME_LEN : (rptr - ptr);
248 if (ds_name_len == 0)
249 { /** only have two keys **/
250 if (!strncmp(rptr + 1, "type", 4))
251 {/** if last key is "type",ignore **/
252 strncpy(tmp_ds_name, dset_name, DATA_MAX_NAME_LEN - 1);
253 }
254 else
255 {/** if last key isn't "type", copy last key **/
256 strncpy(tmp_ds_name, rptr + 1, DATA_MAX_NAME_LEN - 1);
257 }
258 }
259 else if (!strncmp(rptr + 1, "type", 4))
260 {/** more than two keys **/
261 memcpy(tmp_ds_name, ptr + 1, ds_name_len - 1);
262 }
263 else
264 {/** copy whole keys **/
265 strncpy(tmp_ds_name, ptr + 1, DATA_MAX_NAME_LEN - 1);
266 }
267 compact: compact_ds_name(tmp_ds_name, ds_name);
268 return 0;
269 }
271 int get_matching_dset(const struct ceph_daemon *d, const char *name)
272 {
273 int idx;
274 for (idx = 0; idx < d->dset_num; ++idx)
275 {
276 if (strcmp(d->dset[idx].type, name) == 0)
277 {
278 return idx;
279 }
280 }
281 return -1;
282 }
284 int get_matching_value(const struct data_set_s *dset, const char *name,
285 int num_values)
286 {
287 int idx;
288 for (idx = 0; idx < num_values; ++idx)
289 {
290 if (strcmp(dset->ds[idx].name, name) == 0)
291 {
292 return idx;
293 }
294 }
295 return -1;
296 }
298 static int ceph_daemon_add_ds_entry(struct ceph_daemon *d, const char *name,
299 int pc_type)
300 {
301 struct data_source_s *ds;
302 struct data_set_s *dset;
303 struct data_set_s *dset_array;
304 int **pc_types_array = NULL;
305 int *pc_types;
306 int *pc_types_new;
307 int idx = 0;
308 if (strlen(name) + 1 > DATA_MAX_NAME_LEN)
309 return -ENAMETOOLONG;
310 char dset_name[DATA_MAX_NAME_LEN];
311 char ds_name[MAX_RRD_DS_NAME_LEN];
312 memset(dset_name, 0, sizeof(dset_name));
313 memset(ds_name, 0, sizeof(ds_name));
314 if (parse_keys(name, dset_name, ds_name))
315 return 1;
316 idx = get_matching_dset(d, dset_name);
317 if (idx == -1)
318 {/* need to add a dset **/
319 dset_array = realloc(d->dset,
320 sizeof(struct data_set_s) * (d->dset_num + 1));
321 if (!dset_array)
322 return -ENOMEM;
323 pc_types_array = realloc(d->pc_types,
324 sizeof(int *) * (d->dset_num + 1));
325 if (!pc_types_array)
326 return -ENOMEM;
327 dset = &dset_array[d->dset_num];
328 /** this step is very important, otherwise,
329 * realloc for dset->ds will tricky because of
330 * a random addr in dset->ds
331 */
332 memset(dset, 0, sizeof(struct data_set_s));
333 dset->ds_num = 0;
334 snprintf(dset->type, DATA_MAX_NAME_LEN, "%s", dset_name);
335 pc_types = pc_types_array[d->dset_num] = NULL;
336 d->dset = dset_array;
337 }
338 else
339 {
340 dset = &d->dset[idx];
341 pc_types = d->pc_types[idx];
342 }
343 struct data_source_s *ds_array = realloc(dset->ds,
344 sizeof(struct data_source_s) * (dset->ds_num + 1));
345 if (!ds_array)
346 {
347 return -ENOMEM;
348 }
349 pc_types_new = realloc(pc_types, sizeof(int) * (dset->ds_num + 1));
350 if (!pc_types_new)
351 {
352 return -ENOMEM;
353 }
354 dset->ds = ds_array;
355 if (idx == -1)
356 {
357 pc_types_array[d->dset_num] = pc_types_new;
358 d->pc_types = pc_types_array;
359 d->pc_types[d->dset_num][dset->ds_num] = pc_type;
360 d->dset_num++;
361 }
362 else
363 {
364 d->pc_types[idx] = pc_types_new;
365 d->pc_types[idx][dset->ds_num] = pc_type;
366 }
367 ds = &ds_array[dset->ds_num++];
368 snprintf(ds->name, MAX_RRD_DS_NAME_LEN, "%s", ds_name);
369 ds->type =
370 (pc_type & PERFCOUNTER_DERIVE) ? DS_TYPE_DERIVE : DS_TYPE_GAUGE;
371 ds->min = 0;
372 ds->max = NAN;
373 return 0;
374 }
376 /******* ceph_config *******/
377 static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
378 {
379 const char *val;
380 if (item->values_num != 1)
381 {
382 return -ENOTSUP;
383 }
384 if (item->values[0].type != OCONFIG_TYPE_STRING)
385 {
386 return -ENOTSUP;
387 }
388 val = item->values[0].value.string;
389 if (snprintf(dest, dest_len, "%s", val) > (dest_len - 1))
390 {
391 ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
392 item->key);
393 return -ENAMETOOLONG;
394 }
395 return 0;
396 }
398 static int cc_add_daemon_config(oconfig_item_t *ci)
399 {
400 int ret, i;
401 struct ceph_daemon *array, *nd, cd;
402 memset(&cd, 0, sizeof(struct ceph_daemon));
404 if ((ci->values_num != 1) || (ci->values[0].type != OCONFIG_TYPE_STRING))
405 {
406 WARNING("ceph plugin: `Daemon' blocks need exactly one string argument.");
407 return (-1);
408 }
410 ret = cc_handle_str(ci, cd.name, DATA_MAX_NAME_LEN);
411 if (ret)
412 return ret;
414 for (i=0; i < ci->children_num; i++)
415 {
416 oconfig_item_t *child = ci->children + i;
418 if (strcasecmp("SocketPath", child->key) == 0)
419 {
420 ret = cc_handle_str(child, cd.asok_path, sizeof(cd.asok_path));
421 if (ret)
422 return ret;
423 }
424 else
425 {
426 WARNING("ceph plugin: ignoring unknown option %s", child->key);
427 }
428 }
429 if (cd.name[0] == '\0')
430 {
431 ERROR("ceph plugin: you must configure a daemon name.\n");
432 return -EINVAL;
433 }
434 else if (cd.asok_path[0] == '\0')
435 {
436 ERROR("ceph plugin(name=%s): you must configure an administrative "
437 "socket path.\n", cd.name);
438 return -EINVAL;
439 }
440 else if (!((cd.asok_path[0] == '/')
441 || (cd.asok_path[0] == '.' && cd.asok_path[1] == '/')))
442 {
443 ERROR("ceph plugin(name=%s): administrative socket paths must begin with "
444 "'/' or './' Can't parse: '%s'\n", cd.name, cd.asok_path);
445 return -EINVAL;
446 }
447 array = realloc(g_daemons,
448 sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
449 if (array == NULL)
450 {
451 /* The positive return value here indicates that this is a
452 * runtime error, not a configuration error. */
453 return ENOMEM;
454 }
455 g_daemons = (struct ceph_daemon**) array;
456 nd = malloc(sizeof(struct ceph_daemon));
457 if (!nd)
458 return ENOMEM;
459 memcpy(nd, &cd, sizeof(struct ceph_daemon));
460 g_daemons[g_num_daemons++] = nd;
461 return 0;
462 }
464 static int ceph_config(oconfig_item_t *ci)
465 {
466 int ret, i;
468 for (i = 0; i < ci->children_num; ++i)
469 {
470 oconfig_item_t *child = ci->children + i;
471 if (strcasecmp("Daemon", child->key) == 0)
472 {
473 ret = cc_add_daemon_config(child);
474 if (ret)
475 return ret;
476 }
477 else
478 {
479 WARNING("ceph plugin: ignoring unknown option %s", child->key);
480 }
481 }
482 return 0;
483 }
485 /******* JSON parsing *******/
486 typedef int (*node_handler_t)(void*, json_object*, const char*);
488 /** Perform a depth-first traversal of the JSON parse tree,
489 * calling node_handler at each node.*/
490 static int traverse_json_impl(json_object *jo, char *key, int max_key,
491 node_handler_t handler, void *handler_arg)
492 {
493 struct json_object_iter iter;
494 int ret, plen, klen;
496 if (json_object_get_type(jo) != json_type_object)
497 return 0;
498 plen = strlen(key);
499 json_object_object_foreachC(jo, iter)
500 {
501 klen = strlen(iter.key);
502 if (plen + klen + 2 > max_key)
503 return -ENAMETOOLONG;
504 if (plen != 0)
505 strncat(key, ".", max_key); /* really should be strcat */
506 strncat(key, iter.key, max_key);
508 ret = handler(handler_arg, iter.val, key);
509 if (ret == 1)
510 {
511 ret = traverse_json_impl(iter.val, key, max_key, handler,
512 handler_arg);
513 }
514 else if (ret != 0)
515 {
516 return ret;
517 }
519 key[plen] = '\0';
520 }
521 return 0;
522 }
524 static int traverse_json(const char *json, node_handler_t handler,
525 void *handler_arg)
526 {
527 json_object *root;
528 char buf[128];
529 buf[0] = '\0';
530 root = json_tokener_parse(json);
531 if (!root)
532 return -EDOM;
533 int result = traverse_json_impl(root, buf, sizeof(buf), handler, handler_arg);
534 json_object_put(root);
535 return result;
536 }
538 static int node_handler_define_schema(void *arg, json_object *jo,
539 const char *key)
540 {
541 struct ceph_daemon *d = (struct ceph_daemon *) arg;
542 int pc_type;
543 if (json_object_get_type(jo) == json_type_object)
544 return 1;
545 else if (json_object_get_type(jo) != json_type_int)
546 return -EDOM;
547 pc_type = json_object_get_int(jo);
548 DEBUG("\nceph_daemon_add_ds_entry(d=%s,key=%s,pc_type=%04x)",
549 d->name, key, pc_type);
550 return ceph_daemon_add_ds_entry(d, key, pc_type);
551 }
552 struct values_holder
553 {
554 int values_len;
555 value_t *values;
556 };
558 /** A set of values_t data that we build up in memory while parsing the JSON. */
559 struct values_tmp
560 {
561 struct ceph_daemon *d;
562 int holder_num;
563 struct values_holder vh[0];
564 };
566 struct last_data
567 {
568 char dset_name[DATA_MAX_NAME_LEN];
569 char ds_name[MAX_RRD_DS_NAME_LEN];
570 double last_sum;
571 uint64_t last_count;
572 };
574 int add_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
575 {
576 last_poll_data[last_idx] = malloc(1 * sizeof(struct last_data));
577 if(!last_poll_data[last_idx])
578 {
579 return ENOMEM;
580 }
581 sstrncpy(last_poll_data[last_idx]->dset_name,dset_n,sizeof(last_poll_data[last_idx]->dset_name));
582 sstrncpy(last_poll_data[last_idx]->ds_name,ds_n,sizeof(last_poll_data[last_idx]->ds_name));
583 last_poll_data[last_idx]->last_sum = cur_sum;
584 last_poll_data[last_idx]->last_count = cur_count;
585 last_idx++;
586 return 1;
587 }
589 int update_last(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
590 {
591 int i;
592 for(i = 0; i < last_idx; i++)
593 {
594 if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0)
595 {
596 if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0)
597 {
598 last_poll_data[i]->last_sum = cur_sum;
599 last_poll_data[i]->last_count = cur_count;
600 return 1;
601 }
602 }
603 }
605 if(NULL == last_poll_data)
606 {
607 last_poll_data = malloc(1 * sizeof(struct last_data *));
608 if(!last_poll_data)
609 {
610 return ENOMEM;
611 }
612 }
613 else
614 {
615 struct last_data **tmp_last = realloc(last_poll_data, ((last_idx+1) * sizeof(struct last_data *)));
616 if(!tmp_last)
617 {
618 return ENOMEM;
619 }
620 last_poll_data = tmp_last;
621 }
622 add_last(dset_n,ds_n,cur_sum,cur_count);
623 return -1;
624 }
626 double get_last_avg(const char *dset_n, const char *ds_n, double cur_sum, uint64_t cur_count)
627 {
628 int i;
629 double result = -1.1;
630 double sum_delt = 0.0;
631 uint64_t count_delt = 0;
632 for(i = 0; i < last_idx; i++)
633 {
634 if(strcmp(last_poll_data[i]->dset_name,dset_n) == 0)
635 {
636 if(strcmp(last_poll_data[i]->ds_name,ds_n) == 0)
637 {
638 if(cur_count < last_poll_data[i]->last_count)
639 {
640 break;
641 }
642 sum_delt = (cur_sum - last_poll_data[i]->last_sum);
643 count_delt = (cur_count - last_poll_data[i]->last_count);
644 result = (sum_delt / count_delt);
645 break;
646 }
647 }
648 }
650 result = (result == -1.1) ? NAN : result;
651 update_last(dset_n,ds_n,cur_sum,cur_count);
652 return result;
653 }
655 static int node_handler_fetch_data(void *arg, json_object *jo, const char *key)
656 {
657 int dset_idx, ds_idx;
658 value_t *uv;
659 char dset_name[DATA_MAX_NAME_LEN];
660 char ds_name[MAX_RRD_DS_NAME_LEN];
661 struct values_tmp *vtmp = (struct values_tmp*) arg;
662 memset(dset_name, 0, sizeof(dset_name));
663 memset(ds_name, 0, sizeof(ds_name));
664 if (parse_keys(key, dset_name, ds_name))
665 return 1;DEBUG("enter node_handler_fetch_data");
666 dset_idx = get_matching_dset(vtmp->d, dset_name);
667 if (dset_idx == -1)
668 return 1;
669 ds_idx = get_matching_value(&vtmp->d->dset[dset_idx], ds_name,
670 vtmp->d->dset[dset_idx].ds_num);
671 if (ds_idx == -1)
672 return 1;DEBUG("DSet:%s, DS:%s, DSet idx:%d, DS idx:%d",
673 dset_name,ds_name,dset_idx,ds_idx);
674 uv = &(vtmp->vh[dset_idx].values[ds_idx]);
675 if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_LATENCY)
676 {
677 json_object *avgcount, *sum;
678 uint64_t avgcounti;
679 double sumd;
680 if (json_object_get_type(jo) != json_type_object)
681 return -EINVAL;
682 avgcount = json_object_object_get(jo, "avgcount");
683 sum = json_object_object_get(jo, "sum");
684 if ((!avgcount) || (!sum))
685 return -EINVAL;
686 avgcounti = json_object_get_int(avgcount);
687 DEBUG("avgcounti:%ld",avgcounti);
688 if (avgcounti == 0)
689 avgcounti = 1;
690 sumd = json_object_get_double(sum);
691 DEBUG("sumd:%lf",sumd);
692 double last_avg = get_last_avg(dset_name, ds_name, sumd, avgcounti);
693 uv->gauge = last_avg;
694 DEBUG("uv->gauge = (sumd_now - sumd_last) / (avgcounti_now - avgcounti_last) = :%lf",uv->gauge);
695 }
696 else if (vtmp->d->pc_types[dset_idx][ds_idx] & PERFCOUNTER_DERIVE)
697 {
698 /* We use json_object_get_double here because anything > 32
699 * bits may get truncated by json_object_get_int */
700 uv->derive = (uint64_t) json_object_get_double(jo);
701 DEBUG("uv->derive %" PRIu64 "",(uint64_t)uv->derive);
702 }
703 else
704 {
705 uv->gauge = json_object_get_double(jo);
706 DEBUG("uv->gauge %lf",uv->gauge);
707 }
708 return 0;
709 }
711 /******* network I/O *******/
712 enum cstate_t
713 {
714 CSTATE_UNCONNECTED = 0,
715 CSTATE_WRITE_REQUEST,
716 CSTATE_READ_VERSION,
717 CSTATE_READ_AMT,
718 CSTATE_READ_JSON,
719 };
721 enum request_type_t
722 {
723 ASOK_REQ_VERSION = 0,
724 ASOK_REQ_DATA = 1,
725 ASOK_REQ_SCHEMA = 2,
726 ASOK_REQ_NONE = 1000,
727 };
729 struct cconn
730 {
731 /** The Ceph daemon that we're talking to */
732 struct ceph_daemon *d;
734 /** Request type */
735 uint32_t request_type;
737 /** The connection state */
738 enum cstate_t state;
740 /** The socket we use to talk to this daemon */
741 int asok;
743 /** The amount of data remaining to read / write. */
744 uint32_t amt;
746 /** Length of the JSON to read */
747 uint32_t json_len;
749 /** Buffer containing JSON data */
750 char *json;
751 };
753 static int cconn_connect(struct cconn *io)
754 {
755 struct sockaddr_un address;
756 int flags, fd, err;
757 if (io->state != CSTATE_UNCONNECTED)
758 {
759 ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED");
760 return -EDOM;
761 }
762 fd = socket(PF_UNIX, SOCK_STREAM, 0);
763 if (fd < 0)
764 {
765 int err = -errno;
766 ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: "
767 "error %d", err);
768 return err;
769 }
770 memset(&address, 0, sizeof(struct sockaddr_un));
771 address.sun_family = AF_UNIX;
772 snprintf(address.sun_path, sizeof(address.sun_path), "%s",
773 io->d->asok_path);
774 RETRY_ON_EINTR(err,
775 connect(fd, (struct sockaddr *) &address, sizeof(struct sockaddr_un)));
776 if (err < 0)
777 {
778 ERROR("cconn_connect: connect(%d) failed: error %d", fd, err);
779 return err;
780 }
782 flags = fcntl(fd, F_GETFL, 0);
783 if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0)
784 {
785 err = -errno;
786 ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err);
787 return err;
788 }
789 io->asok = fd;
790 io->state = CSTATE_WRITE_REQUEST;
791 io->amt = 0;
792 io->json_len = 0;
793 io->json = NULL;
794 return 0;
795 }
797 static void cconn_close(struct cconn *io)
798 {
799 io->state = CSTATE_UNCONNECTED;
800 if (io->asok != -1)
801 {
802 int res;
803 RETRY_ON_EINTR(res, close(io->asok));
804 }
805 io->asok = -1;
806 io->amt = 0;
807 io->json_len = 0;
808 sfree(io->json);
809 io->json = NULL;
810 }
812 /* Process incoming JSON counter data */
813 /*static int cconn_process_data(struct cconn *io)
814 {
815 int ret;
816 value_list_t vl = VALUE_LIST_INIT;
817 struct values_tmp *vtmp = calloc(1, sizeof(struct values_tmp) +
818 (sizeof(value_t) * io->d->dset.ds_num));
819 if (!vtmp)
820 return -ENOMEM;
821 vtmp->d = io->d;
822 vtmp->values_len = io->d->dset.ds_num;
823 ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
824 if (ret)
825 goto done;
826 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
827 sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
828 sstrncpy(vl.type, io->d->dset.type, sizeof(vl.type));
829 vl.values = vtmp->values;
830 vl.values_len = vtmp->values_len;
831 DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
832 io->d->dset.type, vl.values_len, io->json);
833 ret = plugin_dispatch_values(&vl);
834 done:
835 sfree(vtmp);
836 return ret;
837 }*/
838 static int cconn_process_data(struct cconn *io)
839 {
840 int i, ret = 0;
841 struct values_tmp *vtmp = calloc(1,
842 sizeof(struct values_tmp)
843 + (sizeof(struct values_holder)) * io->d->dset_num);
844 if (!vtmp)
845 return -ENOMEM;
846 for (i = 0; i < io->d->dset_num; i++)
847 {
848 value_t *val = calloc(1, (sizeof(value_t) * io->d->dset[i].ds_num));
849 vtmp->vh[i].values = val;
850 vtmp->vh[i].values_len = io->d->dset[i].ds_num;
851 }
852 vtmp->d = io->d;
853 vtmp->holder_num = io->d->dset_num;
854 ret = traverse_json(io->json, node_handler_fetch_data, vtmp);
855 if (ret)
856 goto done;
857 for (i = 0; i < vtmp->holder_num; i++)
858 {
859 value_list_t vl = VALUE_LIST_INIT;
860 sstrncpy(vl.host, hostname_g, sizeof(vl.host));
861 sstrncpy(vl.plugin, "ceph", sizeof(vl.plugin));
862 strncpy(vl.plugin_instance, io->d->name, sizeof(vl.plugin_instance));
863 sstrncpy(vl.type, io->d->dset[i].type, sizeof(vl.type));
864 vl.values = vtmp->vh[i].values;
865 vl.values_len = vtmp->vh[i].values_len;
866 DEBUG("cconn_process_data(io=%s): vl.values_len=%d, json=\"%s\"",
867 io->d->name, vl.values_len, io->json);
868 ret = plugin_dispatch_values(&vl);
869 if (ret)
870 goto done;
871 }
873 done: for (i = 0; i < vtmp->holder_num; i++)
874 {
875 sfree(vtmp->vh[i].values);
876 }
877 sfree(vtmp);
878 return ret;
879 }
881 static int cconn_process_json(struct cconn *io)
882 {
883 switch (io->request_type)
884 {
885 case ASOK_REQ_DATA:
886 return cconn_process_data(io);
887 case ASOK_REQ_SCHEMA:
888 return traverse_json(io->json, node_handler_define_schema, io->d);
889 default:
890 return -EDOM;
891 }
892 }
894 static int cconn_validate_revents(struct cconn *io, int revents)
895 {
896 if (revents & POLLERR)
897 {
898 ERROR("cconn_validate_revents(name=%s): got POLLERR", io->d->name);
899 return -EIO;
900 }
901 switch (io->state)
902 {
903 case CSTATE_WRITE_REQUEST:
904 return (revents & POLLOUT) ? 0 : -EINVAL;
905 case CSTATE_READ_VERSION:
906 case CSTATE_READ_AMT:
907 case CSTATE_READ_JSON:
908 return (revents & POLLIN) ? 0 : -EINVAL;
909 return (revents & POLLIN) ? 0 : -EINVAL;
910 default:
911 ERROR("cconn_validate_revents(name=%s) got to illegal state on line %d",
912 io->d->name, __LINE__);
913 return -EDOM;
914 }
915 }
917 /** Handle a network event for a connection */
918 static int cconn_handle_event(struct cconn *io)
919 {
920 int ret;
921 switch (io->state)
922 {
923 case CSTATE_UNCONNECTED:
924 ERROR("cconn_handle_event(name=%s) got to illegal state on line %d",
925 io->d->name, __LINE__);
927 return -EDOM;
928 case CSTATE_WRITE_REQUEST:
929 {
930 char cmd[32];
931 /*snprintf(cmd, sizeof(cmd), "%s%d%s", "{\"prefix\":\"", io->request_type,
932 "\"}");*/
933 char req_type_str[2];
934 snprintf(req_type_str, sizeof(req_type_str), "%1.1d", io->request_type);
935 json_object *cmd_object = json_object_new_object();
936 json_object_object_add(cmd_object, "prefix",
937 json_object_new_string(req_type_str));
938 const char *cmd_json = json_object_to_json_string(cmd_object);
939 /** we should send '\n' to server **/
940 snprintf(cmd, sizeof(cmd), "%s\n", cmd_json);
941 size_t cmd_len = strlen(cmd);
942 RETRY_ON_EINTR(ret,
943 write(io->asok, ((char*)&cmd) + io->amt, cmd_len - io->amt));
944 DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
945 io->d->name, io->state, io->amt, ret);
946 if (ret < 0)
947 return ret;
948 io->amt += ret;
949 if (io->amt >= cmd_len)
950 {
951 io->amt = 0;
952 switch (io->request_type)
953 {
954 case ASOK_REQ_VERSION:
955 io->state = CSTATE_READ_VERSION;
956 break;
957 default:
958 io->state = CSTATE_READ_AMT;
959 break;
960 }
961 }
962 json_object_put(cmd_object);
963 return 0;
964 }
965 case CSTATE_READ_VERSION:
966 {
967 RETRY_ON_EINTR(ret,
968 read(io->asok, ((char*)(&io->d->version)) + io->amt,
969 sizeof(io->d->version) - io->amt));
970 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
971 io->d->name, io->state, ret);
972 if (ret < 0)
973 return ret;
974 io->amt += ret;
975 if (io->amt >= sizeof(io->d->version))
976 {
977 io->d->version = ntohl(io->d->version);
978 if (io->d->version != 1)
979 {
980 ERROR("cconn_handle_event(name=%s) not "
981 "expecting version %d!", io->d->name, io->d->version);
982 return -ENOTSUP;
983 }DEBUG("cconn_handle_event(name=%s): identified as "
984 "version %d", io->d->name, io->d->version);
985 io->amt = 0;
986 cconn_close(io);
987 io->request_type = ASOK_REQ_SCHEMA;
988 }
989 return 0;
990 }
991 case CSTATE_READ_AMT:
992 {
993 RETRY_ON_EINTR(ret,
994 read(io->asok, ((char*)(&io->json_len)) + io->amt,
995 sizeof(io->json_len) - io->amt));
996 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
997 io->d->name, io->state, ret);
998 if (ret < 0)
999 return ret;
1000 io->amt += ret;
1001 if (io->amt >= sizeof(io->json_len))
1002 {
1003 io->json_len = ntohl(io->json_len);
1004 io->amt = 0;
1005 io->state = CSTATE_READ_JSON;
1006 io->json = calloc(1, io->json_len + 1);
1007 if (!io->json)
1008 return -ENOMEM;
1009 }
1010 return 0;
1011 }
1012 case CSTATE_READ_JSON:
1013 {
1014 RETRY_ON_EINTR(ret,
1015 read(io->asok, io->json + io->amt, io->json_len - io->amt));
1016 DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
1017 io->d->name, io->state, ret);
1018 if (ret < 0)
1019 return ret;
1020 io->amt += ret;
1021 if (io->amt >= io->json_len)
1022 {
1023 ret = cconn_process_json(io);
1024 if (ret)
1025 return ret;
1026 cconn_close(io);
1027 io->request_type = ASOK_REQ_NONE;
1028 }
1029 return 0;
1030 }
1031 default:
1032 ERROR("cconn_handle_event(name=%s) got to illegal state on "
1033 "line %d", io->d->name, __LINE__);
1034 return -EDOM;
1035 }
1036 }
1038 static int cconn_prepare(struct cconn *io, struct pollfd* fds)
1039 {
1040 int ret;
1041 if (io->request_type == ASOK_REQ_NONE)
1042 {
1043 /* The request has already been serviced. */
1044 return 0;
1045 }
1046 else if ((io->request_type == ASOK_REQ_DATA) && (io->d->dset_num == 0))
1047 {
1048 /* If there are no counters to report on, don't bother
1049 * connecting */
1050 return 0;
1051 }
1053 switch (io->state)
1054 {
1055 case CSTATE_UNCONNECTED:
1056 ret = cconn_connect(io);
1057 if (ret > 0)
1058 return -ret;
1059 else if (ret < 0)
1060 return ret;
1061 fds->fd = io->asok;
1062 fds->events = POLLOUT;
1063 return 1;
1064 case CSTATE_WRITE_REQUEST:
1065 fds->fd = io->asok;
1066 fds->events = POLLOUT;
1067 return 1;
1068 case CSTATE_READ_VERSION:
1069 case CSTATE_READ_AMT:
1070 case CSTATE_READ_JSON:
1071 fds->fd = io->asok;
1072 fds->events = POLLIN;
1073 return 1;
1074 default:
1075 ERROR("cconn_prepare(name=%s) got to illegal state on line %d",
1076 io->d->name, __LINE__);
1077 return -EDOM;
1078 }
1079 }
1081 /** Returns the difference between two struct timevals in milliseconds.
1082 * On overflow, we return max/min int.
1083 */
1084 static int milli_diff(const struct timeval *t1, const struct timeval *t2)
1085 {
1086 int64_t ret;
1087 int sec_diff = t1->tv_sec - t2->tv_sec;
1088 int usec_diff = t1->tv_usec - t2->tv_usec;
1089 ret = usec_diff / 1000;
1090 ret += (sec_diff * 1000);
1091 if (ret > INT_MAX)
1092 return INT_MAX;
1093 else if (ret < INT_MIN)
1094 return INT_MIN;
1095 return (int) ret;
1096 }
1098 /** This handles the actual network I/O to talk to the Ceph daemons.
1099 */
1100 static int cconn_main_loop(uint32_t request_type)
1101 {
1102 int i, ret, some_unreachable = 0;
1103 struct timeval end_tv;
1104 struct cconn io_array[g_num_daemons];
1106 DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
1108 /* create cconn array */
1109 memset(io_array, 0, sizeof(io_array));
1110 for (i = 0; i < g_num_daemons; ++i)
1111 {
1112 io_array[i].d = g_daemons[i];
1113 io_array[i].request_type = request_type;
1114 io_array[i].state = CSTATE_UNCONNECTED;
1115 }
1117 /** Calculate the time at which we should give up */
1118 gettimeofday(&end_tv, NULL);
1119 end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
1121 while (1)
1122 {
1123 int nfds, diff;
1124 struct timeval tv;
1125 struct cconn *polled_io_array[g_num_daemons];
1126 struct pollfd fds[g_num_daemons];
1127 memset(fds, 0, sizeof(fds));
1128 nfds = 0;
1129 for (i = 0; i < g_num_daemons; ++i)
1130 {
1131 struct cconn *io = io_array + i;
1132 ret = cconn_prepare(io, fds + nfds);
1133 if (ret < 0)
1134 {
1135 WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d",
1136 io->d->name, i, io->state, ret);
1137 cconn_close(io);
1138 io->request_type = ASOK_REQ_NONE;
1139 some_unreachable = 1;
1140 }
1141 else if (ret == 1)
1142 {
1143 DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
1144 io->d->name, i, io->state);
1145 polled_io_array[nfds++] = io_array + i;
1146 }
1147 }
1148 if (nfds == 0)
1149 {
1150 /* finished */
1151 ret = 0;
1152 DEBUG("cconn_main_loop: no more cconn to manage.");
1153 goto done;
1154 }
1155 gettimeofday(&tv, NULL);
1156 diff = milli_diff(&end_tv, &tv);
1157 if (diff <= 0)
1158 {
1159 /* Timed out */
1160 ret = -ETIMEDOUT;
1161 WARNING("ERROR: cconn_main_loop: timed out.\n");
1162 goto done;
1163 }
1164 RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
1165 if (ret < 0)
1166 {
1167 ERROR("poll(2) error: %d", ret);
1168 goto done;
1169 }
1170 for (i = 0; i < nfds; ++i)
1171 {
1172 struct cconn *io = polled_io_array[i];
1173 int revents = fds[i].revents;
1174 if (revents == 0)
1175 {
1176 /* do nothing */
1177 }
1178 else if (cconn_validate_revents(io, revents))
1179 {
1180 WARNING("ERROR: cconn(name=%s,i=%d,st=%d): "
1181 "revents validation error: "
1182 "revents=0x%08x", io->d->name, i, io->state, revents);
1183 cconn_close(io);
1184 io->request_type = ASOK_REQ_NONE;
1185 some_unreachable = 1;
1186 }
1187 else
1188 {
1189 int ret = cconn_handle_event(io);
1190 if (ret)
1191 {
1192 WARNING("ERROR: cconn_handle_event(name=%s,"
1193 "i=%d,st=%d): error %d", io->d->name, i, io->state, ret);
1194 cconn_close(io);
1195 io->request_type = ASOK_REQ_NONE;
1196 some_unreachable = 1;
1197 }
1198 }
1199 }
1200 }
1201 done: for (i = 0; i < g_num_daemons; ++i)
1202 {
1203 cconn_close(io_array + i);
1204 }
1205 if (some_unreachable)
1206 {
1207 DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
1208 }
1209 else
1210 {
1211 DEBUG("cconn_main_loop: reached all Ceph daemons :)");
1212 }
1213 return ret;
1214 }
1216 static int ceph_read(void)
1217 {
1218 return cconn_main_loop(ASOK_REQ_DATA);
1219 }
1221 /******* lifecycle *******/
1222 static int ceph_init(void)
1223 {
1224 int i, ret, j;
1225 DEBUG("ceph_init");
1226 ceph_daemons_print();
1228 ret = cconn_main_loop(ASOK_REQ_VERSION);
1229 if (ret)
1230 return ret;
1231 for (i = 0; i < g_num_daemons; ++i)
1232 {
1233 struct ceph_daemon *d = g_daemons[i];
1234 for (j = 0; j < d->dset_num; j++)
1235 {
1236 ret = plugin_register_data_set(d->dset + j);
1237 if (ret)
1238 {
1239 ERROR("plugin_register_data_set(%s) failed!", d->name);
1240 }
1241 else
1242 {
1243 DEBUG("plugin_register_data_set(%s): "
1244 "(d->dset)[%d]->ds_num=%d",
1245 d->name, j, d->dset[j].ds_num);
1246 }
1247 }
1248 }
1249 return 0;
1250 }
1252 static int ceph_shutdown(void)
1253 {
1254 int i;
1255 for (i = 0; i < g_num_daemons; ++i)
1256 {
1257 ceph_daemon_free(g_daemons[i]);
1258 }
1259 sfree(g_daemons);
1260 g_daemons = NULL;
1261 g_num_daemons = 0;
1262 for(i = 0; i < last_idx; i++)
1263 {
1264 sfree(last_poll_data[i]);
1265 }
1266 sfree(last_poll_data);
1267 last_poll_data = NULL;
1268 last_idx = 0;
1269 DEBUG("finished ceph_shutdown");
1270 return 0;
1271 }
1273 void module_register(void)
1274 {
1275 plugin_register_complex_config("ceph", ceph_config);
1276 plugin_register_init("ceph", ceph_init);
1277 plugin_register_read("ceph", ceph_read);
1278 plugin_register_shutdown("ceph", ceph_shutdown);
1279 }