1 /**
2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2008 Florian octo Forster
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License as published by the
7 * Free Software Foundation; only version 2 of the License is applicable.
8 *
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17 *
18 * Authors:
19 * Florian octo Forster <octo at verplant.org>
20 **/
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
27 #include <rrd.h>
29 #if HAVE_PTHREAD_H
30 # include <pthread.h>
31 #endif
33 /*
34 * Private types
35 */
36 struct rrd_cache_s
37 {
38 int values_num;
39 char **values;
40 time_t first_value;
41 time_t last_value;
42 enum
43 {
44 FLAG_NONE = 0x00,
45 FLAG_QUEUED = 0x01
46 } flags;
47 };
48 typedef struct rrd_cache_s rrd_cache_t;
50 enum rrd_queue_dir_e
51 {
52 QUEUE_INSERT_FRONT,
53 QUEUE_INSERT_BACK
54 };
55 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
57 struct rrd_queue_s
58 {
59 char *filename;
60 struct rrd_queue_s *next;
61 };
62 typedef struct rrd_queue_s rrd_queue_t;
64 /*
65 * Private variables
66 */
67 static int rra_timespans[] =
68 {
69 3600,
70 86400,
71 604800,
72 2678400,
73 31622400
74 };
75 static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
77 static int *rra_timespans_custom = NULL;
78 static int rra_timespans_custom_num = 0;
80 static char *rra_types[] =
81 {
82 "AVERAGE",
83 "MIN",
84 "MAX"
85 };
86 static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
88 static const char *config_keys[] =
89 {
90 "CacheTimeout",
91 "CacheFlush",
92 "DataDir",
93 "StepSize",
94 "HeartBeat",
95 "RRARows",
96 "RRATimespan",
97 "XFF"
98 };
99 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
101 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
102 * is zero a default, depending on the `interval' member of the value list is
103 * being used. */
104 static char *datadir = NULL;
105 static int stepsize = 0;
106 static int heartbeat = 0;
107 static int rrarows = 1200;
108 static double xff = 0.1;
110 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
111 * ALWAYS lock `cache_lock' first! */
112 static int cache_timeout = 0;
113 static int cache_flush_timeout = 0;
114 static time_t cache_flush_last;
115 static c_avl_tree_t *cache = NULL;
116 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
118 static rrd_queue_t *queue_head = NULL;
119 static rrd_queue_t *queue_tail = NULL;
120 static pthread_t queue_thread = 0;
121 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
122 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
124 #if !HAVE_THREADSAFE_LIBRRD
125 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
126 #endif
128 static int do_shutdown = 0;
130 /* * * * * * * * * *
131 * WARNING: Magic *
132 * * * * * * * * * */
134 static void rra_free (int rra_num, char **rra_def)
135 {
136 int i;
138 for (i = 0; i < rra_num; i++)
139 {
140 sfree (rra_def[i]);
141 }
142 sfree (rra_def);
143 } /* void rra_free */
145 static int rra_get (char ***ret, const value_list_t *vl)
146 {
147 char **rra_def;
148 int rra_num;
150 int *rts;
151 int rts_num;
153 int rra_max;
155 int span;
157 int cdp_num;
158 int cdp_len;
159 int i, j;
161 char buffer[64];
163 /* The stepsize we use here: If it is user-set, use it. If not, use the
164 * interval of the value-list. */
165 int ss;
167 if (rrarows <= 0)
168 {
169 *ret = NULL;
170 return (-1);
171 }
173 ss = (stepsize > 0) ? stepsize : vl->interval;
174 if (ss <= 0)
175 {
176 *ret = NULL;
177 return (-1);
178 }
180 /* Use the configured timespans or fall back to the built-in defaults */
181 if (rra_timespans_custom_num != 0)
182 {
183 rts = rra_timespans_custom;
184 rts_num = rra_timespans_custom_num;
185 }
186 else
187 {
188 rts = rra_timespans;
189 rts_num = rra_timespans_num;
190 }
192 rra_max = rts_num * rra_types_num;
194 if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
195 return (-1);
196 memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
197 rra_num = 0;
199 cdp_len = 0;
200 for (i = 0; i < rts_num; i++)
201 {
202 span = rts[i];
204 if ((span / ss) < rrarows)
205 span = ss * rrarows;
207 if (cdp_len == 0)
208 cdp_len = 1;
209 else
210 cdp_len = (int) floor (((double) span)
211 / ((double) (rrarows * ss)));
213 cdp_num = (int) ceil (((double) span)
214 / ((double) (cdp_len * ss)));
216 for (j = 0; j < rra_types_num; j++)
217 {
218 if (rra_num >= rra_max)
219 break;
221 if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
222 rra_types[j], xff,
223 cdp_len, cdp_num) >= sizeof (buffer))
224 {
225 ERROR ("rra_get: Buffer would have been truncated.");
226 continue;
227 }
229 rra_def[rra_num++] = sstrdup (buffer);
230 }
231 }
233 #if COLLECT_DEBUG
234 DEBUG ("rra_num = %i", rra_num);
235 for (i = 0; i < rra_num; i++)
236 DEBUG (" %s", rra_def[i]);
237 #endif
239 *ret = rra_def;
240 return (rra_num);
241 } /* int rra_get */
243 static void ds_free (int ds_num, char **ds_def)
244 {
245 int i;
247 for (i = 0; i < ds_num; i++)
248 if (ds_def[i] != NULL)
249 free (ds_def[i]);
250 free (ds_def);
251 }
253 static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
254 {
255 char **ds_def;
256 int ds_num;
258 char min[32];
259 char max[32];
260 char buffer[128];
262 DEBUG ("ds->ds_num = %i", ds->ds_num);
264 ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
265 if (ds_def == NULL)
266 {
267 char errbuf[1024];
268 ERROR ("rrdtool plugin: malloc failed: %s",
269 sstrerror (errno, errbuf, sizeof (errbuf)));
270 return (-1);
271 }
272 memset (ds_def, '\0', ds->ds_num * sizeof (char *));
274 for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
275 {
276 data_source_t *d = ds->ds + ds_num;
277 char *type;
278 int status;
280 ds_def[ds_num] = NULL;
282 if (d->type == DS_TYPE_COUNTER)
283 type = "COUNTER";
284 else if (d->type == DS_TYPE_GAUGE)
285 type = "GAUGE";
286 else
287 {
288 ERROR ("rrdtool plugin: Unknown DS type: %i",
289 d->type);
290 break;
291 }
293 if (isnan (d->min))
294 {
295 sstrncpy (min, "U", sizeof (min));
296 }
297 else
298 ssnprintf (min, sizeof (min), "%lf", d->min);
300 if (isnan (d->max))
301 {
302 sstrncpy (max, "U", sizeof (max));
303 }
304 else
305 ssnprintf (max, sizeof (max), "%lf", d->max);
307 status = ssnprintf (buffer, sizeof (buffer),
308 "DS:%s:%s:%i:%s:%s",
309 d->name, type,
310 (heartbeat > 0) ? heartbeat : (2 * vl->interval),
311 min, max);
312 if ((status < 1) || (status >= sizeof (buffer)))
313 break;
315 ds_def[ds_num] = sstrdup (buffer);
316 } /* for ds_num = 0 .. ds->ds_num */
318 #if COLLECT_DEBUG
319 {
320 int i;
321 DEBUG ("ds_num = %i", ds_num);
322 for (i = 0; i < ds_num; i++)
323 DEBUG (" %s", ds_def[i]);
324 }
325 #endif
327 if (ds_num != ds->ds_num)
328 {
329 ds_free (ds_num, ds_def);
330 return (-1);
331 }
333 *ret = ds_def;
334 return (ds_num);
335 }
337 #if HAVE_THREADSAFE_LIBRRD
338 static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
339 int argc, const char **argv)
340 {
341 int status;
343 optind = 0; /* bug in librrd? */
344 rrd_clear_error ();
346 status = rrd_create_r (filename, pdp_step, last_up, argc, (void *) argv);
348 if (status != 0)
349 {
350 WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s",
351 filename, rrd_get_error ());
352 }
354 return (status);
355 } /* int srrd_create */
357 static int srrd_update (char *filename, char *template,
358 int argc, const char **argv)
359 {
360 int status;
362 optind = 0; /* bug in librrd? */
363 rrd_clear_error ();
365 status = rrd_update_r (filename, template, argc, (void *) argv);
367 if (status != 0)
368 {
369 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
370 filename, rrd_get_error ());
371 }
373 return (status);
374 } /* int srrd_update */
375 /* #endif HAVE_THREADSAFE_LIBRRD */
377 #else /* !HAVE_THREADSAFE_LIBRRD */
378 static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
379 int argc, const char **argv)
380 {
381 int status;
383 int new_argc;
384 char **new_argv;
386 char pdp_step_str[16];
387 char last_up_str[16];
389 new_argc = 6 + argc;
390 new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
391 if (new_argv == NULL)
392 {
393 ERROR ("rrdtool plugin: malloc failed.");
394 return (-1);
395 }
397 if (last_up == 0)
398 last_up = time (NULL) - 10;
400 ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step);
401 ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up);
403 new_argv[0] = "create";
404 new_argv[1] = filename;
405 new_argv[2] = "-s";
406 new_argv[3] = pdp_step_str;
407 new_argv[4] = "-b";
408 new_argv[5] = last_up_str;
410 memcpy (new_argv + 6, argv, argc * sizeof (char *));
411 new_argv[new_argc] = NULL;
413 pthread_mutex_lock (&librrd_lock);
414 optind = 0; /* bug in librrd? */
415 rrd_clear_error ();
417 status = rrd_create (new_argc, new_argv);
418 pthread_mutex_unlock (&librrd_lock);
420 if (status != 0)
421 {
422 WARNING ("rrdtool plugin: rrd_create (%s) failed: %s",
423 filename, rrd_get_error ());
424 }
426 sfree (new_argv);
428 return (status);
429 } /* int srrd_create */
431 static int srrd_update (char *filename, char *template,
432 int argc, const char **argv)
433 {
434 int status;
436 int new_argc;
437 char **new_argv;
439 assert (template == NULL);
441 new_argc = 2 + argc;
442 new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
443 if (new_argv == NULL)
444 {
445 ERROR ("rrdtool plugin: malloc failed.");
446 return (-1);
447 }
449 new_argv[0] = "update";
450 new_argv[1] = filename;
452 memcpy (new_argv + 2, argv, argc * sizeof (char *));
453 new_argv[new_argc] = NULL;
455 pthread_mutex_lock (&librrd_lock);
456 optind = 0; /* bug in librrd? */
457 rrd_clear_error ();
459 status = rrd_update (new_argc, new_argv);
460 pthread_mutex_unlock (&librrd_lock);
462 if (status != 0)
463 {
464 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
465 argv[1], rrd_get_error ());
466 }
468 sfree (new_argv);
470 return (status);
471 } /* int srrd_update */
472 #endif /* !HAVE_THREADSAFE_LIBRRD */
474 static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
475 {
476 char **argv;
477 int argc;
478 char **rra_def;
479 int rra_num;
480 char **ds_def;
481 int ds_num;
482 int status = 0;
484 if (check_create_dir (filename))
485 return (-1);
487 if ((rra_num = rra_get (&rra_def, vl)) < 1)
488 {
489 ERROR ("rrd_create_file failed: Could not calculate RRAs");
490 return (-1);
491 }
493 if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
494 {
495 ERROR ("rrd_create_file failed: Could not calculate DSes");
496 return (-1);
497 }
499 argc = ds_num + rra_num;
501 if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
502 {
503 char errbuf[1024];
504 ERROR ("rrd_create failed: %s",
505 sstrerror (errno, errbuf, sizeof (errbuf)));
506 return (-1);
507 }
509 memcpy (argv, ds_def, ds_num * sizeof (char *));
510 memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *));
511 argv[ds_num + rra_num] = NULL;
513 assert (vl->time > 10);
514 status = srrd_create (filename,
515 (stepsize > 0) ? stepsize : vl->interval,
516 vl->time - 10,
517 argc, (const char **)argv);
519 free (argv);
520 ds_free (ds_num, ds_def);
521 rra_free (rra_num, rra_def);
523 return (status);
524 }
526 static int value_list_to_string (char *buffer, int buffer_len,
527 const data_set_t *ds, const value_list_t *vl)
528 {
529 int offset;
530 int status;
531 int i;
533 memset (buffer, '\0', buffer_len);
535 status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
536 if ((status < 1) || (status >= buffer_len))
537 return (-1);
538 offset = status;
540 for (i = 0; i < ds->ds_num; i++)
541 {
542 if ((ds->ds[i].type != DS_TYPE_COUNTER)
543 && (ds->ds[i].type != DS_TYPE_GAUGE))
544 return (-1);
546 if (ds->ds[i].type == DS_TYPE_COUNTER)
547 status = ssnprintf (buffer + offset, buffer_len - offset,
548 ":%llu", vl->values[i].counter);
549 else
550 status = ssnprintf (buffer + offset, buffer_len - offset,
551 ":%lf", vl->values[i].gauge);
553 if ((status < 1) || (status >= (buffer_len - offset)))
554 return (-1);
556 offset += status;
557 } /* for ds->ds_num */
559 return (0);
560 } /* int value_list_to_string */
562 static int value_list_to_filename (char *buffer, int buffer_len,
563 const data_set_t *ds, const value_list_t *vl)
564 {
565 int offset = 0;
566 int status;
568 if (datadir != NULL)
569 {
570 status = ssnprintf (buffer + offset, buffer_len - offset,
571 "%s/", datadir);
572 if ((status < 1) || (status >= buffer_len - offset))
573 return (-1);
574 offset += status;
575 }
577 status = ssnprintf (buffer + offset, buffer_len - offset,
578 "%s/", vl->host);
579 if ((status < 1) || (status >= buffer_len - offset))
580 return (-1);
581 offset += status;
583 if (strlen (vl->plugin_instance) > 0)
584 status = ssnprintf (buffer + offset, buffer_len - offset,
585 "%s-%s/", vl->plugin, vl->plugin_instance);
586 else
587 status = ssnprintf (buffer + offset, buffer_len - offset,
588 "%s/", vl->plugin);
589 if ((status < 1) || (status >= buffer_len - offset))
590 return (-1);
591 offset += status;
593 if (strlen (vl->type_instance) > 0)
594 status = ssnprintf (buffer + offset, buffer_len - offset,
595 "%s-%s.rrd", vl->type, vl->type_instance);
596 else
597 status = ssnprintf (buffer + offset, buffer_len - offset,
598 "%s.rrd", vl->type);
599 if ((status < 1) || (status >= buffer_len - offset))
600 return (-1);
601 offset += status;
603 return (0);
604 } /* int value_list_to_filename */
606 static void *rrd_queue_thread (void *data)
607 {
608 while (42)
609 {
610 rrd_queue_t *queue_entry;
611 rrd_cache_t *cache_entry;
612 char **values;
613 int values_num;
614 int i;
616 /* XXX: If you need to lock both, cache_lock and queue_lock, at
617 * the same time, ALWAYS lock `cache_lock' first! */
619 /* wait until an entry is available */
620 pthread_mutex_lock (&queue_lock);
621 while ((queue_head == NULL) && (do_shutdown == 0))
622 pthread_cond_wait (&queue_cond, &queue_lock);
624 /* We're in the shutdown phase */
625 if (queue_head == NULL)
626 {
627 pthread_mutex_unlock (&queue_lock);
628 break;
629 }
631 /* Dequeue the first entry */
632 queue_entry = queue_head;
633 if (queue_head == queue_tail)
634 queue_head = queue_tail = NULL;
635 else
636 queue_head = queue_head->next;
638 /* Unlock the queue again */
639 pthread_mutex_unlock (&queue_lock);
641 /* We now need the cache lock so the entry isn't updated while
642 * we make a copy of it's values */
643 pthread_mutex_lock (&cache_lock);
645 c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
647 values = cache_entry->values;
648 values_num = cache_entry->values_num;
650 cache_entry->values = NULL;
651 cache_entry->values_num = 0;
652 cache_entry->flags = FLAG_NONE;
654 pthread_mutex_unlock (&cache_lock);
656 /* Write the values to the RRD-file */
657 srrd_update (queue_entry->filename, NULL,
658 values_num, (const char **)values);
659 DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
660 values_num, queue_entry->filename);
662 for (i = 0; i < values_num; i++)
663 {
664 sfree (values[i]);
665 }
666 sfree (values);
667 sfree (queue_entry->filename);
668 sfree (queue_entry);
669 } /* while (42) */
671 pthread_mutex_lock (&cache_lock);
672 c_avl_destroy (cache);
673 cache = NULL;
674 pthread_mutex_unlock (&cache_lock);
676 pthread_exit ((void *) 0);
677 return ((void *) 0);
678 } /* void *rrd_queue_thread */
680 static int rrd_queue_cache_entry (const char *filename, rrd_queue_dir_t dir)
681 {
682 rrd_queue_t *queue_entry;
684 queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
685 if (queue_entry == NULL)
686 return (-1);
688 queue_entry->filename = strdup (filename);
689 if (queue_entry->filename == NULL)
690 {
691 free (queue_entry);
692 return (-1);
693 }
695 queue_entry->next = NULL;
697 pthread_mutex_lock (&queue_lock);
698 if (dir == QUEUE_INSERT_FRONT)
699 {
700 queue_entry->next = queue_head;
701 queue_head = queue_entry;
702 if (queue_tail == NULL)
703 queue_tail = queue_head;
704 }
705 else /* (dir == QUEUE_INSERT_BACK) */
706 {
707 if (queue_tail == NULL)
708 queue_head = queue_entry;
709 else
710 queue_tail->next = queue_entry;
711 queue_tail = queue_entry;
712 }
713 pthread_cond_signal (&queue_cond);
714 pthread_mutex_unlock (&queue_lock);
716 DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
718 return (0);
719 } /* int rrd_queue_cache_entry */
721 static int rrd_queue_move_to_front (const char *filename)
722 {
723 rrd_queue_t *this;
724 rrd_queue_t *prev;
726 this = NULL;
727 prev = NULL;
728 pthread_mutex_lock (&queue_lock);
729 for (this = queue_head; this != NULL; this = this->next)
730 {
731 if (strcmp (this->filename, filename) == 0)
732 break;
733 prev = this;
734 }
736 /* Check if we found the entry and if it is NOT the first entry. */
737 if ((this != NULL) && (prev != NULL))
738 {
739 prev->next = this->next;
740 this->next = queue_head;
741 queue_head = this;
742 }
743 pthread_mutex_unlock (&queue_lock);
745 return (0);
746 } /* int rrd_queue_move_to_front */
748 static void rrd_cache_flush (int timeout)
749 {
750 rrd_cache_t *rc;
751 time_t now;
753 char **keys = NULL;
754 int keys_num = 0;
756 char *key;
757 c_avl_iterator_t *iter;
758 int i;
760 DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
762 now = time (NULL);
764 /* Build a list of entries to be flushed */
765 iter = c_avl_get_iterator (cache);
766 while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
767 {
768 if (rc->flags == FLAG_QUEUED)
769 continue;
770 else if ((now - rc->first_value) < timeout)
771 continue;
772 else if (rc->values_num > 0)
773 {
774 if (rrd_queue_cache_entry (key, QUEUE_INSERT_BACK) == 0)
775 rc->flags = FLAG_QUEUED;
776 }
777 else /* ancient and no values -> waste of memory */
778 {
779 char **tmp = (char **) realloc ((void *) keys,
780 (keys_num + 1) * sizeof (char *));
781 if (tmp == NULL)
782 {
783 char errbuf[1024];
784 ERROR ("rrdtool plugin: "
785 "realloc failed: %s",
786 sstrerror (errno, errbuf,
787 sizeof (errbuf)));
788 c_avl_iterator_destroy (iter);
789 sfree (keys);
790 return;
791 }
792 keys = tmp;
793 keys[keys_num] = key;
794 keys_num++;
795 }
796 } /* while (c_avl_iterator_next) */
797 c_avl_iterator_destroy (iter);
799 for (i = 0; i < keys_num; i++)
800 {
801 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
802 {
803 DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
804 continue;
805 }
807 assert (rc->values == NULL);
808 assert (rc->values_num == 0);
810 sfree (rc);
811 sfree (key);
812 keys[i] = NULL;
813 } /* for (i = 0..keys_num) */
815 sfree (keys);
817 cache_flush_last = now;
818 } /* void rrd_cache_flush */
820 static int rrd_cache_flush_identifier (int timeout, const char *identifier)
821 {
822 rrd_cache_t *rc;
823 time_t now;
824 int status;
825 char key[2048];
827 if (identifier == NULL)
828 {
829 rrd_cache_flush (timeout);
830 return (0);
831 }
833 now = time (NULL);
835 if (datadir == NULL)
836 snprintf (key, sizeof (key), "%s.rrd",
837 identifier);
838 else
839 snprintf (key, sizeof (key), "%s/%s.rrd",
840 datadir, identifier);
841 key[sizeof (key) - 1] = 0;
843 status = c_avl_get (cache, key, (void *) &rc);
844 if (status != 0)
845 {
846 WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
847 "c_avl_get (%s) failed. Does that file really exist?",
848 key);
849 return (status);
850 }
852 if (rc->flags == FLAG_QUEUED)
853 status = rrd_queue_move_to_front (key);
854 else if ((now - rc->first_value) < timeout)
855 status = 0;
856 else if (rc->values_num > 0)
857 {
858 status = rrd_queue_cache_entry (key, QUEUE_INSERT_FRONT);
859 if (status == 0)
860 rc->flags = FLAG_QUEUED;
861 }
863 return (status);
864 } /* int rrd_cache_flush_identifier */
866 static int rrd_cache_insert (const char *filename,
867 const char *value, time_t value_time)
868 {
869 rrd_cache_t *rc = NULL;
870 int new_rc = 0;
871 char **values_new;
873 pthread_mutex_lock (&cache_lock);
875 c_avl_get (cache, filename, (void *) &rc);
877 if (rc == NULL)
878 {
879 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
880 if (rc == NULL)
881 return (-1);
882 rc->values_num = 0;
883 rc->values = NULL;
884 rc->first_value = 0;
885 rc->last_value = 0;
886 rc->flags = FLAG_NONE;
887 new_rc = 1;
888 }
890 if (rc->last_value >= value_time)
891 {
892 pthread_mutex_unlock (&cache_lock);
893 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
894 (unsigned int) rc->last_value,
895 (unsigned int) value_time);
896 return (-1);
897 }
899 values_new = (char **) realloc ((void *) rc->values,
900 (rc->values_num + 1) * sizeof (char *));
901 if (values_new == NULL)
902 {
903 char errbuf[1024];
904 void *cache_key = NULL;
906 sstrerror (errno, errbuf, sizeof (errbuf));
908 c_avl_remove (cache, filename, &cache_key, NULL);
909 pthread_mutex_unlock (&cache_lock);
911 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
913 sfree (cache_key);
914 sfree (rc->values);
915 sfree (rc);
916 return (-1);
917 }
918 rc->values = values_new;
920 rc->values[rc->values_num] = strdup (value);
921 if (rc->values[rc->values_num] != NULL)
922 rc->values_num++;
924 if (rc->values_num == 1)
925 rc->first_value = value_time;
926 rc->last_value = value_time;
928 /* Insert if this is the first value */
929 if (new_rc == 1)
930 {
931 void *cache_key = strdup (filename);
933 if (cache_key == NULL)
934 {
935 char errbuf[1024];
936 sstrerror (errno, errbuf, sizeof (errbuf));
938 pthread_mutex_unlock (&cache_lock);
940 ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
942 sfree (rc->values[0]);
943 sfree (rc->values);
944 sfree (rc);
945 return (-1);
946 }
948 c_avl_insert (cache, cache_key, rc);
949 }
951 DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
952 "values_num = %i; age = %lu;",
953 filename, rc->values_num,
954 (unsigned long)(rc->last_value - rc->first_value));
956 if ((rc->last_value - rc->first_value) >= cache_timeout)
957 {
958 /* XXX: If you need to lock both, cache_lock and queue_lock, at
959 * the same time, ALWAYS lock `cache_lock' first! */
960 if (rc->flags != FLAG_QUEUED)
961 {
962 if (rrd_queue_cache_entry (filename, QUEUE_INSERT_BACK) == 0)
963 rc->flags = FLAG_QUEUED;
964 }
965 else
966 {
967 DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
968 }
969 }
971 if ((cache_timeout > 0) &&
972 ((time (NULL) - cache_flush_last) > cache_flush_timeout))
973 rrd_cache_flush (cache_flush_timeout);
976 pthread_mutex_unlock (&cache_lock);
978 return (0);
979 } /* int rrd_cache_insert */
981 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
982 {
983 int a = *((int *) a_ptr);
984 int b = *((int *) b_ptr);
986 if (a < b)
987 return (-1);
988 else if (a > b)
989 return (1);
990 else
991 return (0);
992 } /* int rrd_compare_numeric */
994 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
995 {
996 struct stat statbuf;
997 char filename[512];
998 char values[512];
999 int status;
1001 if (0 != strcmp (ds->type, vl->type)) {
1002 ERROR ("rrdtool plugin: DS type does not match value list type");
1003 return -1;
1004 }
1006 if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
1007 return (-1);
1009 if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
1010 return (-1);
1012 if (stat (filename, &statbuf) == -1)
1013 {
1014 if (errno == ENOENT)
1015 {
1016 if (rrd_create_file (filename, ds, vl))
1017 return (-1);
1018 }
1019 else
1020 {
1021 char errbuf[1024];
1022 ERROR ("stat(%s) failed: %s", filename,
1023 sstrerror (errno, errbuf,
1024 sizeof (errbuf)));
1025 return (-1);
1026 }
1027 }
1028 else if (!S_ISREG (statbuf.st_mode))
1029 {
1030 ERROR ("stat(%s): Not a regular file!",
1031 filename);
1032 return (-1);
1033 }
1035 status = rrd_cache_insert (filename, values, vl->time);
1037 return (status);
1038 } /* int rrd_write */
1040 static int rrd_flush (int timeout, const char *identifier)
1041 {
1042 pthread_mutex_lock (&cache_lock);
1044 if (cache == NULL) {
1045 pthread_mutex_unlock (&cache_lock);
1046 return (0);
1047 }
1049 rrd_cache_flush_identifier (timeout, identifier);
1051 pthread_mutex_unlock (&cache_lock);
1052 return (0);
1053 } /* int rrd_flush */
1055 static int rrd_config (const char *key, const char *value)
1056 {
1057 if (strcasecmp ("CacheTimeout", key) == 0)
1058 {
1059 int tmp = atoi (value);
1060 if (tmp < 0)
1061 {
1062 fprintf (stderr, "rrdtool: `CacheTimeout' must "
1063 "be greater than 0.\n");
1064 return (1);
1065 }
1066 cache_timeout = tmp;
1067 }
1068 else if (strcasecmp ("CacheFlush", key) == 0)
1069 {
1070 int tmp = atoi (value);
1071 if (tmp < 0)
1072 {
1073 fprintf (stderr, "rrdtool: `CacheFlush' must "
1074 "be greater than 0.\n");
1075 return (1);
1076 }
1077 cache_flush_timeout = tmp;
1078 }
1079 else if (strcasecmp ("DataDir", key) == 0)
1080 {
1081 if (datadir != NULL)
1082 free (datadir);
1083 datadir = strdup (value);
1084 if (datadir != NULL)
1085 {
1086 int len = strlen (datadir);
1087 while ((len > 0) && (datadir[len - 1] == '/'))
1088 {
1089 len--;
1090 datadir[len] = '\0';
1091 }
1092 if (len <= 0)
1093 {
1094 free (datadir);
1095 datadir = NULL;
1096 }
1097 }
1098 }
1099 else if (strcasecmp ("StepSize", key) == 0)
1100 {
1101 stepsize = atoi (value);
1102 if (stepsize < 0)
1103 stepsize = 0;
1104 }
1105 else if (strcasecmp ("HeartBeat", key) == 0)
1106 {
1107 heartbeat = atoi (value);
1108 if (heartbeat < 0)
1109 heartbeat = 0;
1110 }
1111 else if (strcasecmp ("RRARows", key) == 0)
1112 {
1113 int tmp = atoi (value);
1114 if (tmp <= 0)
1115 {
1116 fprintf (stderr, "rrdtool: `RRARows' must "
1117 "be greater than 0.\n");
1118 return (1);
1119 }
1120 rrarows = tmp;
1121 }
1122 else if (strcasecmp ("RRATimespan", key) == 0)
1123 {
1124 char *saveptr = NULL;
1125 char *dummy;
1126 char *ptr;
1127 char *value_copy;
1128 int *tmp_alloc;
1130 value_copy = strdup (value);
1131 if (value_copy == NULL)
1132 return (1);
1134 dummy = value_copy;
1135 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
1136 {
1137 dummy = NULL;
1139 tmp_alloc = realloc (rra_timespans_custom,
1140 sizeof (int) * (rra_timespans_custom_num + 1));
1141 if (tmp_alloc == NULL)
1142 {
1143 fprintf (stderr, "rrdtool: realloc failed.\n");
1144 free (value_copy);
1145 return (1);
1146 }
1147 rra_timespans_custom = tmp_alloc;
1148 rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
1149 if (rra_timespans_custom[rra_timespans_custom_num] != 0)
1150 rra_timespans_custom_num++;
1151 } /* while (strtok_r) */
1153 qsort (/* base = */ rra_timespans_custom,
1154 /* nmemb = */ rra_timespans_custom_num,
1155 /* size = */ sizeof (rra_timespans_custom[0]),
1156 /* compar = */ rrd_compare_numeric);
1158 free (value_copy);
1159 }
1160 else if (strcasecmp ("XFF", key) == 0)
1161 {
1162 double tmp = atof (value);
1163 if ((tmp < 0.0) || (tmp >= 1.0))
1164 {
1165 fprintf (stderr, "rrdtool: `XFF' must "
1166 "be in the range 0 to 1 (exclusive).");
1167 return (1);
1168 }
1169 xff = tmp;
1170 }
1171 else
1172 {
1173 return (-1);
1174 }
1175 return (0);
1176 } /* int rrd_config */
1178 static int rrd_shutdown (void)
1179 {
1180 pthread_mutex_lock (&cache_lock);
1181 rrd_cache_flush (-1);
1182 pthread_mutex_unlock (&cache_lock);
1184 pthread_mutex_lock (&queue_lock);
1185 do_shutdown = 1;
1186 pthread_cond_signal (&queue_cond);
1187 pthread_mutex_unlock (&queue_lock);
1189 /* Wait for all the values to be written to disk before returning. */
1190 if (queue_thread != 0)
1191 {
1192 pthread_join (queue_thread, NULL);
1193 queue_thread = 0;
1194 DEBUG ("rrdtool plugin: queue_thread exited.");
1195 }
1197 return (0);
1198 } /* int rrd_shutdown */
1200 static int rrd_init (void)
1201 {
1202 int status;
1204 if (stepsize < 0)
1205 stepsize = 0;
1206 if (heartbeat <= 0)
1207 heartbeat = 2 * stepsize;
1209 if ((heartbeat > 0) && (heartbeat < interval_g))
1210 WARNING ("rrdtool plugin: Your `heartbeat' is "
1211 "smaller than your `interval'. This will "
1212 "likely cause problems.");
1213 else if ((stepsize > 0) && (stepsize < interval_g))
1214 WARNING ("rrdtool plugin: Your `stepsize' is "
1215 "smaller than your `interval'. This will "
1216 "create needlessly big RRD-files.");
1218 /* Set the cache up */
1219 pthread_mutex_lock (&cache_lock);
1221 cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1222 if (cache == NULL)
1223 {
1224 ERROR ("rrdtool plugin: c_avl_create failed.");
1225 return (-1);
1226 }
1228 cache_flush_last = time (NULL);
1229 if (cache_timeout < 2)
1230 {
1231 cache_timeout = 0;
1232 cache_flush_timeout = 0;
1233 }
1234 else if (cache_flush_timeout < cache_timeout)
1235 cache_flush_timeout = 10 * cache_timeout;
1237 pthread_mutex_unlock (&cache_lock);
1239 status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
1240 if (status != 0)
1241 {
1242 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1243 return (-1);
1244 }
1246 DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
1247 " heartbeat = %i; rrarows = %i; xff = %lf;",
1248 (datadir == NULL) ? "(null)" : datadir,
1249 stepsize, heartbeat, rrarows, xff);
1251 return (0);
1252 } /* int rrd_init */
1254 void module_register (void)
1255 {
1256 plugin_register_config ("rrdtool", rrd_config,
1257 config_keys, config_keys_num);
1258 plugin_register_init ("rrdtool", rrd_init);
1259 plugin_register_write ("rrdtool", rrd_write);
1260 plugin_register_flush ("rrdtool", rrd_flush);
1261 plugin_register_shutdown ("rrdtool", rrd_shutdown);
1262 }