1 /**
2 * collectd - src/rrdtool.c
3 * Copyright (C) 2006-2008 Florian octo Forster
4 * Copyright (C) 2008-2008 Sebastian Harl
5 * Copyright (C) 2009 Mariusz Gronczewski
6 *
7 * This program is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU General Public License as published by the
9 * Free Software Foundation; only version 2 of the License is applicable.
10 *
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 * Authors:
21 * Florian octo Forster <octo at verplant.org>
22 * Sebastian Harl <sh at tokkee.org>
23 * Mariusz Gronczewski <xani666 at gmail.com>
24 **/
26 #include "collectd.h"
27 #include "plugin.h"
28 #include "common.h"
29 #include "utils_avltree.h"
30 #include "utils_rrdcreate.h"
32 #include <rrd.h>
34 #if HAVE_PTHREAD_H
35 # include <pthread.h>
36 #endif
38 /*
39 * Private types
40 */
41 struct rrd_cache_s
42 {
43 int values_num;
44 char **values;
45 cdtime_t first_value;
46 cdtime_t last_value;
47 int64_t random_variation;
48 enum
49 {
50 FLAG_NONE = 0x00,
51 FLAG_QUEUED = 0x01,
52 FLAG_FLUSHQ = 0x02
53 } flags;
54 };
55 typedef struct rrd_cache_s rrd_cache_t;
57 enum rrd_queue_dir_e
58 {
59 QUEUE_INSERT_FRONT,
60 QUEUE_INSERT_BACK
61 };
62 typedef enum rrd_queue_dir_e rrd_queue_dir_t;
64 struct rrd_queue_s
65 {
66 char *filename;
67 struct rrd_queue_s *next;
68 };
69 typedef struct rrd_queue_s rrd_queue_t;
71 /*
72 * Private variables
73 */
74 static const char *config_keys[] =
75 {
76 "CacheTimeout",
77 "CacheFlush",
78 "CreateFilesAsync",
79 "DataDir",
80 "StepSize",
81 "HeartBeat",
82 "RRARows",
83 "RRATimespan",
84 "XFF",
85 "WritesPerSecond",
86 "RandomTimeout"
87 };
88 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
90 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
91 * is zero a default, depending on the `interval' member of the value list is
92 * being used. */
93 static char *datadir = NULL;
94 static double write_rate = 0.0;
95 static rrdcreate_config_t rrdcreate_config =
96 {
97 /* stepsize = */ 0,
98 /* heartbeat = */ 0,
99 /* rrarows = */ 1200,
100 /* xff = */ 0.1,
102 /* timespans = */ NULL,
103 /* timespans_num = */ 0,
105 /* consolidation_functions = */ NULL,
106 /* consolidation_functions_num = */ 0,
108 /* async = */ 0
109 };
111 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
112 * ALWAYS lock `cache_lock' first! */
113 static cdtime_t cache_timeout = 0;
114 static cdtime_t cache_flush_timeout = 0;
115 static cdtime_t random_timeout = TIME_T_TO_CDTIME_T (1);
116 static cdtime_t cache_flush_last;
117 static c_avl_tree_t *cache = NULL;
118 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
120 static rrd_queue_t *queue_head = NULL;
121 static rrd_queue_t *queue_tail = NULL;
122 static rrd_queue_t *flushq_head = NULL;
123 static rrd_queue_t *flushq_tail = NULL;
124 static pthread_t queue_thread;
125 static int queue_thread_running = 1;
126 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
127 static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
129 #if !HAVE_THREADSAFE_LIBRRD
130 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
131 #endif
133 static int do_shutdown = 0;
135 #if HAVE_THREADSAFE_LIBRRD
136 static int srrd_update (char *filename, char *template,
137 int argc, const char **argv)
138 {
139 int status;
141 optind = 0; /* bug in librrd? */
142 rrd_clear_error ();
144 status = rrd_update_r (filename, template, argc, (void *) argv);
146 if (status != 0)
147 {
148 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
149 filename, rrd_get_error ());
150 }
152 return (status);
153 } /* int srrd_update */
154 /* #endif HAVE_THREADSAFE_LIBRRD */
156 #else /* !HAVE_THREADSAFE_LIBRRD */
157 static int srrd_update (char *filename, char *template,
158 int argc, const char **argv)
159 {
160 int status;
162 int new_argc;
163 char **new_argv;
165 assert (template == NULL);
167 new_argc = 2 + argc;
168 new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
169 if (new_argv == NULL)
170 {
171 ERROR ("rrdtool plugin: malloc failed.");
172 return (-1);
173 }
175 new_argv[0] = "update";
176 new_argv[1] = filename;
178 memcpy (new_argv + 2, argv, argc * sizeof (char *));
179 new_argv[new_argc] = NULL;
181 pthread_mutex_lock (&librrd_lock);
182 optind = 0; /* bug in librrd? */
183 rrd_clear_error ();
185 status = rrd_update (new_argc, new_argv);
186 pthread_mutex_unlock (&librrd_lock);
188 if (status != 0)
189 {
190 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
191 filename, rrd_get_error ());
192 }
194 sfree (new_argv);
196 return (status);
197 } /* int srrd_update */
198 #endif /* !HAVE_THREADSAFE_LIBRRD */
200 static int value_list_to_string (char *buffer, int buffer_len,
201 const data_set_t *ds, const value_list_t *vl)
202 {
203 int offset;
204 int status;
205 time_t tt;
206 int i;
208 memset (buffer, '\0', buffer_len);
210 tt = CDTIME_T_TO_TIME_T (vl->time);
211 status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) tt);
212 if ((status < 1) || (status >= buffer_len))
213 return (-1);
214 offset = status;
216 for (i = 0; i < ds->ds_num; i++)
217 {
218 if ((ds->ds[i].type != DS_TYPE_COUNTER)
219 && (ds->ds[i].type != DS_TYPE_GAUGE)
220 && (ds->ds[i].type != DS_TYPE_DERIVE)
221 && (ds->ds[i].type != DS_TYPE_ABSOLUTE))
222 return (-1);
224 if (ds->ds[i].type == DS_TYPE_COUNTER)
225 status = ssnprintf (buffer + offset, buffer_len - offset,
226 ":%llu", vl->values[i].counter);
227 else if (ds->ds[i].type == DS_TYPE_GAUGE)
228 status = ssnprintf (buffer + offset, buffer_len - offset,
229 ":%lf", vl->values[i].gauge);
230 else if (ds->ds[i].type == DS_TYPE_DERIVE)
231 status = ssnprintf (buffer + offset, buffer_len - offset,
232 ":%"PRIi64, vl->values[i].derive);
233 else /*if (ds->ds[i].type == DS_TYPE_ABSOLUTE) */
234 status = ssnprintf (buffer + offset, buffer_len - offset,
235 ":%"PRIu64, vl->values[i].absolute);
237 if ((status < 1) || (status >= (buffer_len - offset)))
238 return (-1);
240 offset += status;
241 } /* for ds->ds_num */
243 return (0);
244 } /* int value_list_to_string */
246 static int value_list_to_filename (char *buffer, int buffer_len,
247 const data_set_t __attribute__((unused)) *ds, const value_list_t *vl)
248 {
249 int offset = 0;
250 int status;
252 if (datadir != NULL)
253 {
254 status = ssnprintf (buffer + offset, buffer_len - offset,
255 "%s/", datadir);
256 if ((status < 1) || (status >= buffer_len - offset))
257 return (-1);
258 offset += status;
259 }
261 status = ssnprintf (buffer + offset, buffer_len - offset,
262 "%s/", vl->host);
263 if ((status < 1) || (status >= buffer_len - offset))
264 return (-1);
265 offset += status;
267 if (strlen (vl->plugin_instance) > 0)
268 status = ssnprintf (buffer + offset, buffer_len - offset,
269 "%s-%s/", vl->plugin, vl->plugin_instance);
270 else
271 status = ssnprintf (buffer + offset, buffer_len - offset,
272 "%s/", vl->plugin);
273 if ((status < 1) || (status >= buffer_len - offset))
274 return (-1);
275 offset += status;
277 if (strlen (vl->type_instance) > 0)
278 status = ssnprintf (buffer + offset, buffer_len - offset,
279 "%s-%s.rrd", vl->type, vl->type_instance);
280 else
281 status = ssnprintf (buffer + offset, buffer_len - offset,
282 "%s.rrd", vl->type);
283 if ((status < 1) || (status >= buffer_len - offset))
284 return (-1);
285 offset += status;
287 return (0);
288 } /* int value_list_to_filename */
290 static void *rrd_queue_thread (void __attribute__((unused)) *data)
291 {
292 struct timeval tv_next_update;
293 struct timeval tv_now;
295 gettimeofday (&tv_next_update, /* timezone = */ NULL);
297 while (42)
298 {
299 rrd_queue_t *queue_entry;
300 rrd_cache_t *cache_entry;
301 char **values;
302 int values_num;
303 int status;
304 int i;
306 values = NULL;
307 values_num = 0;
309 pthread_mutex_lock (&queue_lock);
310 /* Wait for values to arrive */
311 while (42)
312 {
313 struct timespec ts_wait;
315 while ((flushq_head == NULL) && (queue_head == NULL)
316 && (do_shutdown == 0))
317 pthread_cond_wait (&queue_cond, &queue_lock);
319 if ((flushq_head == NULL) && (queue_head == NULL))
320 break;
322 /* Don't delay if there's something to flush */
323 if (flushq_head != NULL)
324 break;
326 /* Don't delay if we're shutting down */
327 if (do_shutdown != 0)
328 break;
330 /* Don't delay if no delay was configured. */
331 if (write_rate <= 0.0)
332 break;
334 gettimeofday (&tv_now, /* timezone = */ NULL);
335 status = timeval_cmp (tv_next_update, tv_now, NULL);
336 /* We're good to go */
337 if (status <= 0)
338 break;
340 /* We're supposed to wait a bit with this update, so we'll
341 * wait for the next addition to the queue or to the end of
342 * the wait period - whichever comes first. */
343 ts_wait.tv_sec = tv_next_update.tv_sec;
344 ts_wait.tv_nsec = 1000 * tv_next_update.tv_usec;
346 status = pthread_cond_timedwait (&queue_cond, &queue_lock,
347 &ts_wait);
348 if (status == ETIMEDOUT)
349 break;
350 } /* while (42) */
352 /* XXX: If you need to lock both, cache_lock and queue_lock, at
353 * the same time, ALWAYS lock `cache_lock' first! */
355 /* We're in the shutdown phase */
356 if ((flushq_head == NULL) && (queue_head == NULL))
357 {
358 pthread_mutex_unlock (&queue_lock);
359 break;
360 }
362 if (flushq_head != NULL)
363 {
364 /* Dequeue the first flush entry */
365 queue_entry = flushq_head;
366 if (flushq_head == flushq_tail)
367 flushq_head = flushq_tail = NULL;
368 else
369 flushq_head = flushq_head->next;
370 }
371 else /* if (queue_head != NULL) */
372 {
373 /* Dequeue the first regular entry */
374 queue_entry = queue_head;
375 if (queue_head == queue_tail)
376 queue_head = queue_tail = NULL;
377 else
378 queue_head = queue_head->next;
379 }
381 /* Unlock the queue again */
382 pthread_mutex_unlock (&queue_lock);
384 /* We now need the cache lock so the entry isn't updated while
385 * we make a copy of it's values */
386 pthread_mutex_lock (&cache_lock);
388 status = c_avl_get (cache, queue_entry->filename,
389 (void *) &cache_entry);
391 if (status == 0)
392 {
393 values = cache_entry->values;
394 values_num = cache_entry->values_num;
396 cache_entry->values = NULL;
397 cache_entry->values_num = 0;
398 cache_entry->flags = FLAG_NONE;
399 }
401 pthread_mutex_unlock (&cache_lock);
403 if (status != 0)
404 {
405 sfree (queue_entry->filename);
406 sfree (queue_entry);
407 continue;
408 }
410 /* Update `tv_next_update' */
411 if (write_rate > 0.0)
412 {
413 gettimeofday (&tv_now, /* timezone = */ NULL);
414 tv_next_update.tv_sec = tv_now.tv_sec;
415 tv_next_update.tv_usec = tv_now.tv_usec
416 + ((suseconds_t) (1000000 * write_rate));
417 while (tv_next_update.tv_usec > 1000000)
418 {
419 tv_next_update.tv_sec++;
420 tv_next_update.tv_usec -= 1000000;
421 }
422 }
424 /* Write the values to the RRD-file */
425 srrd_update (queue_entry->filename, NULL,
426 values_num, (const char **)values);
427 DEBUG ("rrdtool plugin: queue thread: Wrote %i value%s to %s",
428 values_num, (values_num == 1) ? "" : "s",
429 queue_entry->filename);
431 for (i = 0; i < values_num; i++)
432 {
433 sfree (values[i]);
434 }
435 sfree (values);
436 sfree (queue_entry->filename);
437 sfree (queue_entry);
438 } /* while (42) */
440 pthread_exit ((void *) 0);
441 return ((void *) 0);
442 } /* void *rrd_queue_thread */
444 static int rrd_queue_enqueue (const char *filename,
445 rrd_queue_t **head, rrd_queue_t **tail)
446 {
447 rrd_queue_t *queue_entry;
449 queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
450 if (queue_entry == NULL)
451 return (-1);
453 queue_entry->filename = strdup (filename);
454 if (queue_entry->filename == NULL)
455 {
456 free (queue_entry);
457 return (-1);
458 }
460 queue_entry->next = NULL;
462 pthread_mutex_lock (&queue_lock);
464 if (*tail == NULL)
465 *head = queue_entry;
466 else
467 (*tail)->next = queue_entry;
468 *tail = queue_entry;
470 pthread_cond_signal (&queue_cond);
471 pthread_mutex_unlock (&queue_lock);
473 return (0);
474 } /* int rrd_queue_enqueue */
476 static int rrd_queue_dequeue (const char *filename,
477 rrd_queue_t **head, rrd_queue_t **tail)
478 {
479 rrd_queue_t *this;
480 rrd_queue_t *prev;
482 pthread_mutex_lock (&queue_lock);
484 prev = NULL;
485 this = *head;
487 while (this != NULL)
488 {
489 if (strcmp (this->filename, filename) == 0)
490 break;
492 prev = this;
493 this = this->next;
494 }
496 if (this == NULL)
497 {
498 pthread_mutex_unlock (&queue_lock);
499 return (-1);
500 }
502 if (prev == NULL)
503 *head = this->next;
504 else
505 prev->next = this->next;
507 if (this->next == NULL)
508 *tail = prev;
510 pthread_mutex_unlock (&queue_lock);
512 sfree (this->filename);
513 sfree (this);
515 return (0);
516 } /* int rrd_queue_dequeue */
518 /* XXX: You must hold "cache_lock" when calling this function! */
519 static void rrd_cache_flush (cdtime_t timeout)
520 {
521 rrd_cache_t *rc;
522 cdtime_t now;
524 char **keys = NULL;
525 int keys_num = 0;
527 char *key;
528 c_avl_iterator_t *iter;
529 int i;
531 DEBUG ("rrdtool plugin: Flushing cache, timeout = %.3f",
532 CDTIME_T_TO_DOUBLE (timeout));
534 now = cdtime ();
535 timeout = TIME_T_TO_CDTIME_T (timeout);
537 /* Build a list of entries to be flushed */
538 iter = c_avl_get_iterator (cache);
539 while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
540 {
541 if (rc->flags != FLAG_NONE)
542 continue;
543 /* timeout == 0 => flush everything */
544 else if ((timeout != 0)
545 && ((now - rc->first_value) < timeout))
546 continue;
547 else if (rc->values_num > 0)
548 {
549 int status;
551 status = rrd_queue_enqueue (key, &queue_head, &queue_tail);
552 if (status == 0)
553 rc->flags = FLAG_QUEUED;
554 }
555 else /* ancient and no values -> waste of memory */
556 {
557 char **tmp = (char **) realloc ((void *) keys,
558 (keys_num + 1) * sizeof (char *));
559 if (tmp == NULL)
560 {
561 char errbuf[1024];
562 ERROR ("rrdtool plugin: "
563 "realloc failed: %s",
564 sstrerror (errno, errbuf,
565 sizeof (errbuf)));
566 c_avl_iterator_destroy (iter);
567 sfree (keys);
568 return;
569 }
570 keys = tmp;
571 keys[keys_num] = key;
572 keys_num++;
573 }
574 } /* while (c_avl_iterator_next) */
575 c_avl_iterator_destroy (iter);
577 for (i = 0; i < keys_num; i++)
578 {
579 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
580 {
581 DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
582 continue;
583 }
585 assert (rc->values == NULL);
586 assert (rc->values_num == 0);
588 sfree (rc);
589 sfree (key);
590 keys[i] = NULL;
591 } /* for (i = 0..keys_num) */
593 sfree (keys);
595 cache_flush_last = now;
596 } /* void rrd_cache_flush */
598 static int rrd_cache_flush_identifier (cdtime_t timeout,
599 const char *identifier)
600 {
601 rrd_cache_t *rc;
602 cdtime_t now;
603 int status;
604 char key[2048];
606 if (identifier == NULL)
607 {
608 rrd_cache_flush (timeout);
609 return (0);
610 }
612 now = cdtime ();
614 if (datadir == NULL)
615 snprintf (key, sizeof (key), "%s.rrd",
616 identifier);
617 else
618 snprintf (key, sizeof (key), "%s/%s.rrd",
619 datadir, identifier);
620 key[sizeof (key) - 1] = 0;
622 status = c_avl_get (cache, key, (void *) &rc);
623 if (status != 0)
624 {
625 INFO ("rrdtool plugin: rrd_cache_flush_identifier: "
626 "c_avl_get (%s) failed. Does that file really exist?",
627 key);
628 return (status);
629 }
631 if (rc->flags == FLAG_FLUSHQ)
632 {
633 status = 0;
634 }
635 else if (rc->flags == FLAG_QUEUED)
636 {
637 rrd_queue_dequeue (key, &queue_head, &queue_tail);
638 status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
639 if (status == 0)
640 rc->flags = FLAG_FLUSHQ;
641 }
642 else if ((now - rc->first_value) < timeout)
643 {
644 status = 0;
645 }
646 else if (rc->values_num > 0)
647 {
648 status = rrd_queue_enqueue (key, &flushq_head, &flushq_tail);
649 if (status == 0)
650 rc->flags = FLAG_FLUSHQ;
651 }
653 return (status);
654 } /* int rrd_cache_flush_identifier */
656 static int64_t rrd_get_random_variation (void)
657 {
658 double dbl_timeout;
659 cdtime_t ctm_timeout;
660 double rand_fact;
661 _Bool negative;
662 int64_t ret;
664 if (random_timeout <= 0)
665 return (0);
667 /* Assure that "cache_timeout + random_variation" is never negative. */
668 if (random_timeout > cache_timeout)
669 {
670 INFO ("rrdtool plugin: Adjusting \"RandomTimeout\" to %.3f seconds.",
671 CDTIME_T_TO_DOUBLE (cache_timeout));
672 random_timeout = cache_timeout;
673 }
675 /* This seems a bit complicated, but "random_timeout" is likely larger than
676 * RAND_MAX, so we can't simply use modulo here. */
677 dbl_timeout = CDTIME_T_TO_DOUBLE (random_timeout);
678 rand_fact = ((double) random ())
679 / ((double) RAND_MAX);
680 negative = (_Bool) (random () % 2);
682 ctm_timeout = DOUBLE_TO_CDTIME_T (dbl_timeout * rand_fact);
684 ret = (int64_t) ctm_timeout;
685 if (negative)
686 ret *= -1;
688 return (ret);
689 } /* int64_t rrd_get_random_variation */
691 static int rrd_cache_insert (const char *filename,
692 const char *value, cdtime_t value_time)
693 {
694 rrd_cache_t *rc = NULL;
695 int new_rc = 0;
696 char **values_new;
698 pthread_mutex_lock (&cache_lock);
700 /* This shouldn't happen, but it did happen at least once, so we'll be
701 * careful. */
702 if (cache == NULL)
703 {
704 pthread_mutex_unlock (&cache_lock);
705 WARNING ("rrdtool plugin: cache == NULL.");
706 return (-1);
707 }
709 c_avl_get (cache, filename, (void *) &rc);
711 if (rc == NULL)
712 {
713 rc = malloc (sizeof (*rc));
714 if (rc == NULL)
715 return (-1);
716 rc->values_num = 0;
717 rc->values = NULL;
718 rc->first_value = 0;
719 rc->last_value = 0;
720 rc->random_variation = rrd_get_random_variation ();
721 rc->flags = FLAG_NONE;
722 new_rc = 1;
723 }
725 if (rc->last_value >= value_time)
726 {
727 pthread_mutex_unlock (&cache_lock);
728 DEBUG ("rrdtool plugin: (rc->last_value = %"PRIu64") "
729 ">= (value_time = %"PRIu64")",
730 rc->last_value, value_time);
731 return (-1);
732 }
734 values_new = (char **) realloc ((void *) rc->values,
735 (rc->values_num + 1) * sizeof (char *));
736 if (values_new == NULL)
737 {
738 char errbuf[1024];
739 void *cache_key = NULL;
741 sstrerror (errno, errbuf, sizeof (errbuf));
743 c_avl_remove (cache, filename, &cache_key, NULL);
744 pthread_mutex_unlock (&cache_lock);
746 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
748 sfree (cache_key);
749 sfree (rc->values);
750 sfree (rc);
751 return (-1);
752 }
753 rc->values = values_new;
755 rc->values[rc->values_num] = strdup (value);
756 if (rc->values[rc->values_num] != NULL)
757 rc->values_num++;
759 if (rc->values_num == 1)
760 rc->first_value = value_time;
761 rc->last_value = value_time;
763 /* Insert if this is the first value */
764 if (new_rc == 1)
765 {
766 void *cache_key = strdup (filename);
768 if (cache_key == NULL)
769 {
770 char errbuf[1024];
771 sstrerror (errno, errbuf, sizeof (errbuf));
773 pthread_mutex_unlock (&cache_lock);
775 ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
777 sfree (rc->values[0]);
778 sfree (rc->values);
779 sfree (rc);
780 return (-1);
781 }
783 c_avl_insert (cache, cache_key, rc);
784 }
786 DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
787 "values_num = %i; age = %.3f;",
788 filename, rc->values_num,
789 CDTIME_T_TO_DOUBLE (rc->last_value - rc->first_value));
791 if ((rc->last_value - rc->first_value) >= (cache_timeout + rc->random_variation))
792 {
793 /* XXX: If you need to lock both, cache_lock and queue_lock, at
794 * the same time, ALWAYS lock `cache_lock' first! */
795 if (rc->flags == FLAG_NONE)
796 {
797 int status;
799 status = rrd_queue_enqueue (filename, &queue_head, &queue_tail);
800 if (status == 0)
801 rc->flags = FLAG_QUEUED;
803 rc->random_variation = rrd_get_random_variation ();
804 }
805 else
806 {
807 DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
808 }
809 }
811 if ((cache_timeout > 0) &&
812 ((cdtime () - cache_flush_last) > cache_flush_timeout))
813 rrd_cache_flush (cache_flush_timeout);
815 pthread_mutex_unlock (&cache_lock);
817 return (0);
818 } /* int rrd_cache_insert */
820 static int rrd_cache_destroy (void) /* {{{ */
821 {
822 void *key = NULL;
823 void *value = NULL;
825 int non_empty = 0;
827 pthread_mutex_lock (&cache_lock);
829 if (cache == NULL)
830 {
831 pthread_mutex_unlock (&cache_lock);
832 return (0);
833 }
835 while (c_avl_pick (cache, &key, &value) == 0)
836 {
837 rrd_cache_t *rc;
838 int i;
840 sfree (key);
841 key = NULL;
843 rc = value;
844 value = NULL;
846 if (rc->values_num > 0)
847 non_empty++;
849 for (i = 0; i < rc->values_num; i++)
850 sfree (rc->values[i]);
851 sfree (rc->values);
852 sfree (rc);
853 }
855 c_avl_destroy (cache);
856 cache = NULL;
858 if (non_empty > 0)
859 {
860 INFO ("rrdtool plugin: %i cache %s had values when destroying the cache.",
861 non_empty, (non_empty == 1) ? "entry" : "entries");
862 }
863 else
864 {
865 DEBUG ("rrdtool plugin: No values have been lost "
866 "when destroying the cache.");
867 }
869 pthread_mutex_unlock (&cache_lock);
870 return (0);
871 } /* }}} int rrd_cache_destroy */
873 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
874 {
875 int a = *((int *) a_ptr);
876 int b = *((int *) b_ptr);
878 if (a < b)
879 return (-1);
880 else if (a > b)
881 return (1);
882 else
883 return (0);
884 } /* int rrd_compare_numeric */
886 static int rrd_write (const data_set_t *ds, const value_list_t *vl,
887 user_data_t __attribute__((unused)) *user_data)
888 {
889 struct stat statbuf;
890 char filename[512];
891 char values[512];
892 int status;
894 if (do_shutdown)
895 return (0);
897 if (0 != strcmp (ds->type, vl->type)) {
898 ERROR ("rrdtool plugin: DS type does not match value list type");
899 return -1;
900 }
902 if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
903 return (-1);
905 if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
906 return (-1);
908 if (stat (filename, &statbuf) == -1)
909 {
910 if (errno == ENOENT)
911 {
912 status = cu_rrd_create_file (filename,
913 ds, vl, &rrdcreate_config);
914 if (status != 0)
915 return (-1);
916 else if (rrdcreate_config.async)
917 return (0);
918 }
919 else
920 {
921 char errbuf[1024];
922 ERROR ("stat(%s) failed: %s", filename,
923 sstrerror (errno, errbuf,
924 sizeof (errbuf)));
925 return (-1);
926 }
927 }
928 else if (!S_ISREG (statbuf.st_mode))
929 {
930 ERROR ("stat(%s): Not a regular file!",
931 filename);
932 return (-1);
933 }
935 status = rrd_cache_insert (filename, values, vl->time);
937 return (status);
938 } /* int rrd_write */
940 static int rrd_flush (cdtime_t timeout, const char *identifier,
941 __attribute__((unused)) user_data_t *user_data)
942 {
943 pthread_mutex_lock (&cache_lock);
945 if (cache == NULL) {
946 pthread_mutex_unlock (&cache_lock);
947 return (0);
948 }
950 rrd_cache_flush_identifier (timeout, identifier);
952 pthread_mutex_unlock (&cache_lock);
953 return (0);
954 } /* int rrd_flush */
956 static int rrd_config (const char *key, const char *value)
957 {
958 if (strcasecmp ("CacheTimeout", key) == 0)
959 {
960 double tmp = atof (value);
961 if (tmp < 0)
962 {
963 fprintf (stderr, "rrdtool: `CacheTimeout' must "
964 "be greater than 0.\n");
965 ERROR ("rrdtool: `CacheTimeout' must "
966 "be greater than 0.\n");
967 return (1);
968 }
969 cache_timeout = DOUBLE_TO_CDTIME_T (tmp);
970 }
971 else if (strcasecmp ("CacheFlush", key) == 0)
972 {
973 int tmp = atoi (value);
974 if (tmp < 0)
975 {
976 fprintf (stderr, "rrdtool: `CacheFlush' must "
977 "be greater than 0.\n");
978 ERROR ("rrdtool: `CacheFlush' must "
979 "be greater than 0.\n");
980 return (1);
981 }
982 cache_flush_timeout = tmp;
983 }
984 else if (strcasecmp ("DataDir", key) == 0)
985 {
986 if (datadir != NULL)
987 free (datadir);
988 datadir = strdup (value);
989 if (datadir != NULL)
990 {
991 int len = strlen (datadir);
992 while ((len > 0) && (datadir[len - 1] == '/'))
993 {
994 len--;
995 datadir[len] = '\0';
996 }
997 if (len <= 0)
998 {
999 free (datadir);
1000 datadir = NULL;
1001 }
1002 }
1003 }
1004 else if (strcasecmp ("StepSize", key) == 0)
1005 {
1006 unsigned long temp = strtoul (value, NULL, 0);
1007 if (temp > 0)
1008 rrdcreate_config.stepsize = temp;
1009 }
1010 else if (strcasecmp ("HeartBeat", key) == 0)
1011 {
1012 int temp = atoi (value);
1013 if (temp > 0)
1014 rrdcreate_config.heartbeat = temp;
1015 }
1016 else if (strcasecmp ("CreateFilesAsync", key) == 0)
1017 {
1018 if (IS_TRUE (value))
1019 rrdcreate_config.async = 1;
1020 else
1021 rrdcreate_config.async = 0;
1022 }
1023 else if (strcasecmp ("RRARows", key) == 0)
1024 {
1025 int tmp = atoi (value);
1026 if (tmp <= 0)
1027 {
1028 fprintf (stderr, "rrdtool: `RRARows' must "
1029 "be greater than 0.\n");
1030 ERROR ("rrdtool: `RRARows' must "
1031 "be greater than 0.\n");
1032 return (1);
1033 }
1034 rrdcreate_config.rrarows = tmp;
1035 }
1036 else if (strcasecmp ("RRATimespan", key) == 0)
1037 {
1038 char *saveptr = NULL;
1039 char *dummy;
1040 char *ptr;
1041 char *value_copy;
1042 int *tmp_alloc;
1044 value_copy = strdup (value);
1045 if (value_copy == NULL)
1046 return (1);
1048 dummy = value_copy;
1049 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
1050 {
1051 dummy = NULL;
1053 tmp_alloc = realloc (rrdcreate_config.timespans,
1054 sizeof (int) * (rrdcreate_config.timespans_num + 1));
1055 if (tmp_alloc == NULL)
1056 {
1057 fprintf (stderr, "rrdtool: realloc failed.\n");
1058 ERROR ("rrdtool: realloc failed.\n");
1059 free (value_copy);
1060 return (1);
1061 }
1062 rrdcreate_config.timespans = tmp_alloc;
1063 rrdcreate_config.timespans[rrdcreate_config.timespans_num] = atoi (ptr);
1064 if (rrdcreate_config.timespans[rrdcreate_config.timespans_num] != 0)
1065 rrdcreate_config.timespans_num++;
1066 } /* while (strtok_r) */
1068 qsort (/* base = */ rrdcreate_config.timespans,
1069 /* nmemb = */ rrdcreate_config.timespans_num,
1070 /* size = */ sizeof (rrdcreate_config.timespans[0]),
1071 /* compar = */ rrd_compare_numeric);
1073 free (value_copy);
1074 }
1075 else if (strcasecmp ("XFF", key) == 0)
1076 {
1077 double tmp = atof (value);
1078 if ((tmp < 0.0) || (tmp >= 1.0))
1079 {
1080 fprintf (stderr, "rrdtool: `XFF' must "
1081 "be in the range 0 to 1 (exclusive).");
1082 ERROR ("rrdtool: `XFF' must "
1083 "be in the range 0 to 1 (exclusive).");
1084 return (1);
1085 }
1086 rrdcreate_config.xff = tmp;
1087 }
1088 else if (strcasecmp ("WritesPerSecond", key) == 0)
1089 {
1090 double wps = atof (value);
1092 if (wps < 0.0)
1093 {
1094 fprintf (stderr, "rrdtool: `WritesPerSecond' must be "
1095 "greater than or equal to zero.");
1096 return (1);
1097 }
1098 else if (wps == 0.0)
1099 {
1100 write_rate = 0.0;
1101 }
1102 else
1103 {
1104 write_rate = 1.0 / wps;
1105 }
1106 }
1107 else if (strcasecmp ("RandomTimeout", key) == 0)
1108 {
1109 double tmp;
1111 tmp = atof (value);
1112 if (tmp < 0.0)
1113 {
1114 fprintf (stderr, "rrdtool: `RandomTimeout' must "
1115 "be greater than or equal to zero.\n");
1116 ERROR ("rrdtool: `RandomTimeout' must "
1117 "be greater then or equal to zero.");
1118 }
1119 else
1120 {
1121 random_timeout = DOUBLE_TO_CDTIME_T (tmp);
1122 }
1123 }
1124 else
1125 {
1126 return (-1);
1127 }
1128 return (0);
1129 } /* int rrd_config */
1131 static int rrd_shutdown (void)
1132 {
1133 pthread_mutex_lock (&cache_lock);
1134 rrd_cache_flush (0);
1135 pthread_mutex_unlock (&cache_lock);
1137 pthread_mutex_lock (&queue_lock);
1138 do_shutdown = 1;
1139 pthread_cond_signal (&queue_cond);
1140 pthread_mutex_unlock (&queue_lock);
1142 if ((queue_thread_running != 0)
1143 && ((queue_head != NULL) || (flushq_head != NULL)))
1144 {
1145 INFO ("rrdtool plugin: Shutting down the queue thread. "
1146 "This may take a while.");
1147 }
1148 else if (queue_thread_running != 0)
1149 {
1150 INFO ("rrdtool plugin: Shutting down the queue thread.");
1151 }
1153 /* Wait for all the values to be written to disk before returning. */
1154 if (queue_thread_running != 0)
1155 {
1156 pthread_join (queue_thread, NULL);
1157 memset (&queue_thread, 0, sizeof (queue_thread));
1158 queue_thread_running = 0;
1159 DEBUG ("rrdtool plugin: queue_thread exited.");
1160 }
1162 rrd_cache_destroy ();
1164 return (0);
1165 } /* int rrd_shutdown */
1167 static int rrd_init (void)
1168 {
1169 static int init_once = 0;
1170 int status;
1172 if (init_once != 0)
1173 return (0);
1174 init_once = 1;
1176 if (rrdcreate_config.heartbeat <= 0)
1177 rrdcreate_config.heartbeat = 2 * rrdcreate_config.stepsize;
1179 /* Set the cache up */
1180 pthread_mutex_lock (&cache_lock);
1182 cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1183 if (cache == NULL)
1184 {
1185 ERROR ("rrdtool plugin: c_avl_create failed.");
1186 return (-1);
1187 }
1189 cache_flush_last = cdtime ();
1190 if (cache_timeout == 0)
1191 {
1192 cache_flush_timeout = 0;
1193 }
1194 else if (cache_flush_timeout < cache_timeout)
1195 cache_flush_timeout = 10 * cache_timeout;
1197 pthread_mutex_unlock (&cache_lock);
1199 status = plugin_thread_create (&queue_thread, /* attr = */ NULL,
1200 rrd_queue_thread, /* args = */ NULL);
1201 if (status != 0)
1202 {
1203 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1204 return (-1);
1205 }
1206 queue_thread_running = 1;
1208 DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %lu;"
1209 " heartbeat = %i; rrarows = %i; xff = %lf;",
1210 (datadir == NULL) ? "(null)" : datadir,
1211 rrdcreate_config.stepsize,
1212 rrdcreate_config.heartbeat,
1213 rrdcreate_config.rrarows,
1214 rrdcreate_config.xff);
1216 return (0);
1217 } /* int rrd_init */
1219 void module_register (void)
1220 {
1221 plugin_register_config ("rrdtool", rrd_config,
1222 config_keys, config_keys_num);
1223 plugin_register_init ("rrdtool", rrd_init);
1224 plugin_register_write ("rrdtool", rrd_write, /* user_data = */ NULL);
1225 plugin_register_flush ("rrdtool", rrd_flush, /* user_data = */ NULL);
1226 plugin_register_shutdown ("rrdtool", rrd_shutdown);
1227 }