Code

Merge branch 'collectd-4.4'
[collectd.git] / src / rrdtool.c
1 /**
2  * collectd - src/rrdtool.c
3  * Copyright (C) 2006,2007  Florian octo Forster
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License as published by the
7  * Free Software Foundation; only version 2 of the License is applicable.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
17  *
18  * Authors:
19  *   Florian octo Forster <octo at verplant.org>
20  **/
22 #include "collectd.h"
23 #include "plugin.h"
24 #include "common.h"
25 #include "utils_avltree.h"
27 #include <rrd.h>
29 #if HAVE_PTHREAD_H
30 # include <pthread.h>
31 #endif
33 /*
34  * Private types
35  */
36 struct rrd_cache_s
37 {
38         int    values_num;
39         char **values;
40         time_t first_value;
41         time_t last_value;
42         enum
43         {
44                 FLAG_NONE   = 0x00,
45                 FLAG_QUEUED = 0x01
46         } flags;
47 };
48 typedef struct rrd_cache_s rrd_cache_t;
50 struct rrd_queue_s
51 {
52         char *filename;
53         struct rrd_queue_s *next;
54 };
55 typedef struct rrd_queue_s rrd_queue_t;
57 /*
58  * Private variables
59  */
60 static int rra_timespans[] =
61 {
62         3600,
63         86400,
64         604800,
65         2678400,
66         31622400
67 };
68 static int rra_timespans_num = STATIC_ARRAY_SIZE (rra_timespans);
70 static int *rra_timespans_custom = NULL;
71 static int rra_timespans_custom_num = 0;
73 static char *rra_types[] =
74 {
75         "AVERAGE",
76         "MIN",
77         "MAX"
78 };
79 static int rra_types_num = STATIC_ARRAY_SIZE (rra_types);
81 static const char *config_keys[] =
82 {
83         "CacheTimeout",
84         "CacheFlush",
85         "DataDir",
86         "StepSize",
87         "HeartBeat",
88         "RRARows",
89         "RRATimespan",
90         "XFF"
91 };
92 static int config_keys_num = STATIC_ARRAY_SIZE (config_keys);
94 /* If datadir is zero, the daemon's basedir is used. If stepsize or heartbeat
95  * is zero a default, depending on the `interval' member of the value list is
96  * being used. */
97 static char   *datadir   = NULL;
98 static int     stepsize  = 0;
99 static int     heartbeat = 0;
100 static int     rrarows   = 1200;
101 static double  xff       = 0.1;
103 /* XXX: If you need to lock both, cache_lock and queue_lock, at the same time,
104  * ALWAYS lock `cache_lock' first! */
105 static int         cache_timeout = 0;
106 static int         cache_flush_timeout = 0;
107 static time_t      cache_flush_last;
108 static c_avl_tree_t *cache = NULL;
109 static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER;
111 static rrd_queue_t    *queue_head = NULL;
112 static rrd_queue_t    *queue_tail = NULL;
113 static pthread_t       queue_thread = 0;
114 static pthread_mutex_t queue_lock = PTHREAD_MUTEX_INITIALIZER;
115 static pthread_cond_t  queue_cond = PTHREAD_COND_INITIALIZER;
117 #if !HAVE_THREADSAFE_LIBRRD
118 static pthread_mutex_t librrd_lock = PTHREAD_MUTEX_INITIALIZER;
119 #endif
121 static int do_shutdown = 0;
123 /* * * * * * * * * *
124  * WARNING:  Magic *
125  * * * * * * * * * */
127 static void rra_free (int rra_num, char **rra_def)
129         int i;
131         for (i = 0; i < rra_num; i++)
132         {
133                 sfree (rra_def[i]);
134         }
135         sfree (rra_def);
136 } /* void rra_free */
138 static int rra_get (char ***ret, const value_list_t *vl)
140         char **rra_def;
141         int rra_num;
143         int *rts;
144         int  rts_num;
146         int rra_max;
148         int span;
150         int cdp_num;
151         int cdp_len;
152         int i, j;
154         char buffer[64];
156         /* The stepsize we use here: If it is user-set, use it. If not, use the
157          * interval of the value-list. */
158         int ss;
160         if (rrarows <= 0)
161         {
162                 *ret = NULL;
163                 return (-1);
164         }
166         ss = (stepsize > 0) ? stepsize : vl->interval;
167         if (ss <= 0)
168         {
169                 *ret = NULL;
170                 return (-1);
171         }
173         /* Use the configured timespans or fall back to the built-in defaults */
174         if (rra_timespans_custom_num != 0)
175         {
176                 rts = rra_timespans_custom;
177                 rts_num = rra_timespans_custom_num;
178         }
179         else
180         {
181                 rts = rra_timespans;
182                 rts_num = rra_timespans_num;
183         }
185         rra_max = rts_num * rra_types_num;
187         if ((rra_def = (char **) malloc ((rra_max + 1) * sizeof (char *))) == NULL)
188                 return (-1);
189         memset (rra_def, '\0', (rra_max + 1) * sizeof (char *));
190         rra_num = 0;
192         cdp_len = 0;
193         for (i = 0; i < rts_num; i++)
194         {
195                 span = rts[i];
197                 if ((span / ss) < rrarows)
198                         span = ss * rrarows;
200                 if (cdp_len == 0)
201                         cdp_len = 1;
202                 else
203                         cdp_len = (int) floor (((double) span)
204                                         / ((double) (rrarows * ss)));
206                 cdp_num = (int) ceil (((double) span)
207                                 / ((double) (cdp_len * ss)));
209                 for (j = 0; j < rra_types_num; j++)
210                 {
211                         if (rra_num >= rra_max)
212                                 break;
214                         if (ssnprintf (buffer, sizeof (buffer), "RRA:%s:%3.1f:%u:%u",
215                                                 rra_types[j], xff,
216                                                 cdp_len, cdp_num) >= sizeof (buffer))
217                         {
218                                 ERROR ("rra_get: Buffer would have been truncated.");
219                                 continue;
220                         }
222                         rra_def[rra_num++] = sstrdup (buffer);
223                 }
224         }
226 #if COLLECT_DEBUG
227         DEBUG ("rra_num = %i", rra_num);
228         for (i = 0; i < rra_num; i++)
229                 DEBUG ("  %s", rra_def[i]);
230 #endif
232         *ret = rra_def;
233         return (rra_num);
234 } /* int rra_get */
236 static void ds_free (int ds_num, char **ds_def)
238         int i;
240         for (i = 0; i < ds_num; i++)
241                 if (ds_def[i] != NULL)
242                         free (ds_def[i]);
243         free (ds_def);
246 static int ds_get (char ***ret, const data_set_t *ds, const value_list_t *vl)
248         char **ds_def;
249         int ds_num;
251         char min[32];
252         char max[32];
253         char buffer[128];
255         DEBUG ("ds->ds_num = %i", ds->ds_num);
257         ds_def = (char **) malloc (ds->ds_num * sizeof (char *));
258         if (ds_def == NULL)
259         {
260                 char errbuf[1024];
261                 ERROR ("rrdtool plugin: malloc failed: %s",
262                                 sstrerror (errno, errbuf, sizeof (errbuf)));
263                 return (-1);
264         }
265         memset (ds_def, '\0', ds->ds_num * sizeof (char *));
267         for (ds_num = 0; ds_num < ds->ds_num; ds_num++)
268         {
269                 data_source_t *d = ds->ds + ds_num;
270                 char *type;
271                 int status;
273                 ds_def[ds_num] = NULL;
275                 if (d->type == DS_TYPE_COUNTER)
276                         type = "COUNTER";
277                 else if (d->type == DS_TYPE_GAUGE)
278                         type = "GAUGE";
279                 else
280                 {
281                         ERROR ("rrdtool plugin: Unknown DS type: %i",
282                                         d->type);
283                         break;
284                 }
286                 if (isnan (d->min))
287                         strcpy (min, "U");
288                 else
289                         ssnprintf (min, sizeof (min), "%lf", d->min);
291                 if (isnan (d->max))
292                         strcpy (max, "U");
293                 else
294                         ssnprintf (max, sizeof (max), "%lf", d->max);
296                 status = ssnprintf (buffer, sizeof (buffer),
297                                 "DS:%s:%s:%i:%s:%s",
298                                 d->name, type,
299                                 (heartbeat > 0) ? heartbeat : (2 * vl->interval),
300                                 min, max);
301                 if ((status < 1) || (status >= sizeof (buffer)))
302                         break;
304                 ds_def[ds_num] = sstrdup (buffer);
305         } /* for ds_num = 0 .. ds->ds_num */
307 #if COLLECT_DEBUG
309         int i;
310         DEBUG ("ds_num = %i", ds_num);
311         for (i = 0; i < ds_num; i++)
312                 DEBUG ("  %s", ds_def[i]);
314 #endif
316         if (ds_num != ds->ds_num)
317         {
318                 ds_free (ds_num, ds_def);
319                 return (-1);
320         }
322         *ret = ds_def;
323         return (ds_num);
326 #if HAVE_THREADSAFE_LIBRRD
327 static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
328                 int argc, char **argv)
330         int status;
332         optind = 0; /* bug in librrd? */
333         rrd_clear_error ();
335         status = rrd_create_r (filename, pdp_step, last_up, argc, argv);
337         if (status != 0)
338         {
339                 WARNING ("rrdtool plugin: rrd_create_r (%s) failed: %s",
340                                 filename, rrd_get_error ());
341         }
343         return (status);
344 } /* int srrd_create */
346 static int srrd_update (char *filename, char *template, int argc, char **argv)
348         int status;
350         optind = 0; /* bug in librrd? */
351         rrd_clear_error ();
353         status = rrd_update_r (filename, template, argc, argv);
355         if (status != 0)
356         {
357                 WARNING ("rrdtool plugin: rrd_update_r (%s) failed: %s",
358                                 filename, rrd_get_error ());
359         }
361         return (status);
362 } /* int srrd_update */
363 /* #endif HAVE_THREADSAFE_LIBRRD */
365 #else /* !HAVE_THREADSAFE_LIBRRD */
366 static int srrd_create (char *filename, unsigned long pdp_step, time_t last_up,
367                 int argc, char **argv)
369         int status;
371         int new_argc;
372         char **new_argv;
374         char pdp_step_str[16];
375         char last_up_str[16];
377         new_argc = 6 + argc;
378         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
379         if (new_argv == NULL)
380         {
381                 ERROR ("rrdtool plugin: malloc failed.");
382                 return (-1);
383         }
385         if (last_up == 0)
386                 last_up = time (NULL) - 10;
388         ssnprintf (pdp_step_str, sizeof (pdp_step_str), "%lu", pdp_step);
389         ssnprintf (last_up_str, sizeof (last_up_str), "%u", (unsigned int) last_up);
391         new_argv[0] = "create";
392         new_argv[1] = filename;
393         new_argv[2] = "-s";
394         new_argv[3] = pdp_step_str;
395         new_argv[4] = "-b";
396         new_argv[5] = last_up_str;
398         memcpy (new_argv + 6, argv, argc * sizeof (char *));
399         new_argv[new_argc] = NULL;
400         
401         pthread_mutex_lock (&librrd_lock);
402         optind = 0; /* bug in librrd? */
403         rrd_clear_error ();
405         status = rrd_create (new_argc, new_argv);
406         pthread_mutex_unlock (&librrd_lock);
408         if (status != 0)
409         {
410                 WARNING ("rrdtool plugin: rrd_create (%s) failed: %s",
411                                 filename, rrd_get_error ());
412         }
414         sfree (new_argv);
416         return (status);
417 } /* int srrd_create */
419 static int srrd_update (char *filename, char *template, int argc, char **argv)
421         int status;
423         int new_argc;
424         char **new_argv;
426         assert (template == NULL);
428         new_argc = 2 + argc;
429         new_argv = (char **) malloc ((new_argc + 1) * sizeof (char *));
430         if (new_argv == NULL)
431         {
432                 ERROR ("rrdtool plugin: malloc failed.");
433                 return (-1);
434         }
436         new_argv[0] = "update";
437         new_argv[1] = filename;
439         memcpy (new_argv + 2, argv, argc * sizeof (char *));
440         new_argv[new_argc] = NULL;
442         pthread_mutex_lock (&librrd_lock);
443         optind = 0; /* bug in librrd? */
444         rrd_clear_error ();
446         status = rrd_update (new_argc, new_argv);
447         pthread_mutex_unlock (&librrd_lock);
449         if (status != 0)
450         {
451                 WARNING ("rrdtool plugin: rrd_update_r failed: %s: %s",
452                                 argv[1], rrd_get_error ());
453         }
455         sfree (new_argv);
457         return (status);
458 } /* int srrd_update */
459 #endif /* !HAVE_THREADSAFE_LIBRRD */
461 static int rrd_create_file (char *filename, const data_set_t *ds, const value_list_t *vl)
463         char **argv;
464         int argc;
465         char **rra_def;
466         int rra_num;
467         char **ds_def;
468         int ds_num;
469         int status = 0;
471         if (check_create_dir (filename))
472                 return (-1);
474         if ((rra_num = rra_get (&rra_def, vl)) < 1)
475         {
476                 ERROR ("rrd_create_file failed: Could not calculate RRAs");
477                 return (-1);
478         }
480         if ((ds_num = ds_get (&ds_def, ds, vl)) < 1)
481         {
482                 ERROR ("rrd_create_file failed: Could not calculate DSes");
483                 return (-1);
484         }
486         argc = ds_num + rra_num;
488         if ((argv = (char **) malloc (sizeof (char *) * (argc + 1))) == NULL)
489         {
490                 char errbuf[1024];
491                 ERROR ("rrd_create failed: %s",
492                                 sstrerror (errno, errbuf, sizeof (errbuf)));
493                 return (-1);
494         }
496         memcpy (argv, ds_def, ds_num * sizeof (char *));
497         memcpy (argv + ds_num, rra_def, rra_num * sizeof (char *));
498         argv[ds_num + rra_num] = NULL;
500         assert (vl->time > 10);
501         status = srrd_create (filename,
502                         (stepsize > 0) ? stepsize : vl->interval,
503                         vl->time - 10,
504                         argc, argv);
506         free (argv);
507         ds_free (ds_num, ds_def);
508         rra_free (rra_num, rra_def);
510         return (status);
513 static int value_list_to_string (char *buffer, int buffer_len,
514                 const data_set_t *ds, const value_list_t *vl)
516         int offset;
517         int status;
518         int i;
520         memset (buffer, '\0', buffer_len);
522         status = ssnprintf (buffer, buffer_len, "%u", (unsigned int) vl->time);
523         if ((status < 1) || (status >= buffer_len))
524                 return (-1);
525         offset = status;
527         for (i = 0; i < ds->ds_num; i++)
528         {
529                 if ((ds->ds[i].type != DS_TYPE_COUNTER)
530                                 && (ds->ds[i].type != DS_TYPE_GAUGE))
531                         return (-1);
533                 if (ds->ds[i].type == DS_TYPE_COUNTER)
534                         status = ssnprintf (buffer + offset, buffer_len - offset,
535                                         ":%llu", vl->values[i].counter);
536                 else
537                         status = ssnprintf (buffer + offset, buffer_len - offset,
538                                         ":%lf", vl->values[i].gauge);
540                 if ((status < 1) || (status >= (buffer_len - offset)))
541                         return (-1);
543                 offset += status;
544         } /* for ds->ds_num */
546         return (0);
547 } /* int value_list_to_string */
549 static int value_list_to_filename (char *buffer, int buffer_len,
550                 const data_set_t *ds, const value_list_t *vl)
552         int offset = 0;
553         int status;
555         if (datadir != NULL)
556         {
557                 status = ssnprintf (buffer + offset, buffer_len - offset,
558                                 "%s/", datadir);
559                 if ((status < 1) || (status >= buffer_len - offset))
560                         return (-1);
561                 offset += status;
562         }
564         status = ssnprintf (buffer + offset, buffer_len - offset,
565                         "%s/", vl->host);
566         if ((status < 1) || (status >= buffer_len - offset))
567                 return (-1);
568         offset += status;
570         if (strlen (vl->plugin_instance) > 0)
571                 status = ssnprintf (buffer + offset, buffer_len - offset,
572                                 "%s-%s/", vl->plugin, vl->plugin_instance);
573         else
574                 status = ssnprintf (buffer + offset, buffer_len - offset,
575                                 "%s/", vl->plugin);
576         if ((status < 1) || (status >= buffer_len - offset))
577                 return (-1);
578         offset += status;
580         if (strlen (vl->type_instance) > 0)
581                 status = ssnprintf (buffer + offset, buffer_len - offset,
582                                 "%s-%s.rrd", vl->type, vl->type_instance);
583         else
584                 status = ssnprintf (buffer + offset, buffer_len - offset,
585                                 "%s.rrd", vl->type);
586         if ((status < 1) || (status >= buffer_len - offset))
587                 return (-1);
588         offset += status;
590         return (0);
591 } /* int value_list_to_filename */
593 static void *rrd_queue_thread (void *data)
595         while (42)
596         {
597                 rrd_queue_t *queue_entry;
598                 rrd_cache_t *cache_entry;
599                 char **values;
600                 int    values_num;
601                 int    i;
603                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
604                  * the same time, ALWAYS lock `cache_lock' first! */
606                 /* wait until an entry is available */
607                 pthread_mutex_lock (&queue_lock);
608                 while ((queue_head == NULL) && (do_shutdown == 0))
609                         pthread_cond_wait (&queue_cond, &queue_lock);
611                 /* We're in the shutdown phase */
612                 if (queue_head == NULL)
613                 {
614                         pthread_mutex_unlock (&queue_lock);
615                         break;
616                 }
618                 /* Dequeue the first entry */
619                 queue_entry = queue_head;
620                 if (queue_head == queue_tail)
621                         queue_head = queue_tail = NULL;
622                 else
623                         queue_head = queue_head->next;
625                 /* Unlock the queue again */
626                 pthread_mutex_unlock (&queue_lock);
628                 /* We now need the cache lock so the entry isn't updated while
629                  * we make a copy of it's values */
630                 pthread_mutex_lock (&cache_lock);
632                 c_avl_get (cache, queue_entry->filename, (void *) &cache_entry);
634                 values = cache_entry->values;
635                 values_num = cache_entry->values_num;
637                 cache_entry->values = NULL;
638                 cache_entry->values_num = 0;
639                 cache_entry->flags = FLAG_NONE;
641                 pthread_mutex_unlock (&cache_lock);
643                 /* Write the values to the RRD-file */
644                 srrd_update (queue_entry->filename, NULL, values_num, values);
645                 DEBUG ("rrdtool plugin: queue thread: Wrote %i values to %s",
646                                 values_num, queue_entry->filename);
648                 for (i = 0; i < values_num; i++)
649                 {
650                         sfree (values[i]);
651                 }
652                 sfree (values);
653                 sfree (queue_entry->filename);
654                 sfree (queue_entry);
655         } /* while (42) */
657         pthread_mutex_lock (&cache_lock);
658         c_avl_destroy (cache);
659         cache = NULL;
660         pthread_mutex_unlock (&cache_lock);
662         pthread_exit ((void *) 0);
663         return ((void *) 0);
664 } /* void *rrd_queue_thread */
666 static int rrd_queue_cache_entry (const char *filename)
668         rrd_queue_t *queue_entry;
670         queue_entry = (rrd_queue_t *) malloc (sizeof (rrd_queue_t));
671         if (queue_entry == NULL)
672                 return (-1);
674         queue_entry->filename = strdup (filename);
675         if (queue_entry->filename == NULL)
676         {
677                 free (queue_entry);
678                 return (-1);
679         }
681         queue_entry->next = NULL;
683         pthread_mutex_lock (&queue_lock);
684         if (queue_tail == NULL)
685                 queue_head = queue_entry;
686         else
687                 queue_tail->next = queue_entry;
688         queue_tail = queue_entry;
689         pthread_cond_signal (&queue_cond);
690         pthread_mutex_unlock (&queue_lock);
692         DEBUG ("rrdtool plugin: Put `%s' into the update queue", filename);
694         return (0);
695 } /* int rrd_queue_cache_entry */
697 static void rrd_cache_flush (int timeout)
699         rrd_cache_t *rc;
700         time_t       now;
702         char **keys = NULL;
703         int    keys_num = 0;
705         char *key;
706         c_avl_iterator_t *iter;
707         int i;
709         DEBUG ("rrdtool plugin: Flushing cache, timeout = %i", timeout);
711         now = time (NULL);
713         /* Build a list of entries to be flushed */
714         iter = c_avl_get_iterator (cache);
715         while (c_avl_iterator_next (iter, (void *) &key, (void *) &rc) == 0)
716         {
717                 if (rc->flags == FLAG_QUEUED)
718                         continue;
719                 else if ((now - rc->first_value) < timeout)
720                         continue;
721                 else if (rc->values_num > 0)
722                 {
723                         if (rrd_queue_cache_entry (key) == 0)
724                                 rc->flags = FLAG_QUEUED;
725                 }
726                 else /* ancient and no values -> waste of memory */
727                 {
728                         char **tmp = (char **) realloc ((void *) keys,
729                                         (keys_num + 1) * sizeof (char *));
730                         if (tmp == NULL)
731                         {
732                                 char errbuf[1024];
733                                 ERROR ("rrdtool plugin: "
734                                                 "realloc failed: %s",
735                                                 sstrerror (errno, errbuf,
736                                                         sizeof (errbuf)));
737                                 c_avl_iterator_destroy (iter);
738                                 sfree (keys);
739                                 return;
740                         }
741                         keys = tmp;
742                         keys[keys_num] = key;
743                         keys_num++;
744                 }
745         } /* while (c_avl_iterator_next) */
746         c_avl_iterator_destroy (iter);
747         
748         for (i = 0; i < keys_num; i++)
749         {
750                 if (c_avl_remove (cache, keys[i], (void *) &key, (void *) &rc) != 0)
751                 {
752                         DEBUG ("rrdtool plugin: c_avl_remove (%s) failed.", keys[i]);
753                         continue;
754                 }
756                 assert (rc->values == NULL);
757                 assert (rc->values_num == 0);
759                 sfree (rc);
760                 sfree (key);
761                 keys[i] = NULL;
762         } /* for (i = 0..keys_num) */
764         sfree (keys);
766         cache_flush_last = now;
767 } /* void rrd_cache_flush */
769 static int rrd_cache_flush_identifier (int timeout, const char *identifier)
771   rrd_cache_t *rc;
772   time_t now;
773   int status;
774   char key[2048];
776   if (identifier == NULL)
777   {
778     rrd_cache_flush (timeout);
779     return (0);
780   }
782   now = time (NULL);
784   if (datadir == NULL)
785           snprintf (key, sizeof (key), "%s.rrd",
786                           identifier);
787   else
788           snprintf (key, sizeof (key), "%s/%s.rrd",
789                           datadir, identifier);
790   key[sizeof (key) - 1] = 0;
792   status = c_avl_get (cache, key, (void *) &rc);
793   if (status != 0)
794   {
795     WARNING ("rrdtool plugin: rrd_cache_flush_identifier: "
796         "c_avl_get (%s) failed. Does that file really exist?",
797         key);
798     return (status);
799   }
801   if (rc->flags == FLAG_QUEUED)
802     status = 0;
803   else if ((now - rc->first_value) < timeout)
804     status = 0;
805   else if (rc->values_num > 0)
806   {
807     status = rrd_queue_cache_entry (key);
808     if (status == 0)
809       rc->flags = FLAG_QUEUED;
810   }
812   return (status);
813 } /* int rrd_cache_flush_identifier */
815 static int rrd_cache_insert (const char *filename,
816                 const char *value, time_t value_time)
818         rrd_cache_t *rc = NULL;
819         int new_rc = 0;
820         char **values_new;
822         pthread_mutex_lock (&cache_lock);
824         c_avl_get (cache, filename, (void *) &rc);
826         if (rc == NULL)
827         {
828                 rc = (rrd_cache_t *) malloc (sizeof (rrd_cache_t));
829                 if (rc == NULL)
830                         return (-1);
831                 rc->values_num = 0;
832                 rc->values = NULL;
833                 rc->first_value = 0;
834                 rc->last_value = 0;
835                 rc->flags = FLAG_NONE;
836                 new_rc = 1;
837         }
839         if (rc->last_value >= value_time)
840         {
841                 pthread_mutex_unlock (&cache_lock);
842                 WARNING ("rrdtool plugin: (rc->last_value = %u) >= (value_time = %u)",
843                                 (unsigned int) rc->last_value,
844                                 (unsigned int) value_time);
845                 return (-1);
846         }
848         values_new = (char **) realloc ((void *) rc->values,
849                         (rc->values_num + 1) * sizeof (char *));
850         if (values_new == NULL)
851         {
852                 char errbuf[1024];
853                 void *cache_key = NULL;
855                 sstrerror (errno, errbuf, sizeof (errbuf));
857                 c_avl_remove (cache, filename, &cache_key, NULL);
858                 pthread_mutex_unlock (&cache_lock);
860                 ERROR ("rrdtool plugin: realloc failed: %s", errbuf);
862                 sfree (cache_key);
863                 sfree (rc->values);
864                 sfree (rc);
865                 return (-1);
866         }
867         rc->values = values_new;
869         rc->values[rc->values_num] = strdup (value);
870         if (rc->values[rc->values_num] != NULL)
871                 rc->values_num++;
873         if (rc->values_num == 1)
874                 rc->first_value = value_time;
875         rc->last_value = value_time;
877         /* Insert if this is the first value */
878         if (new_rc == 1)
879         {
880                 void *cache_key = strdup (filename);
882                 if (cache_key == NULL)
883                 {
884                         char errbuf[1024];
885                         sstrerror (errno, errbuf, sizeof (errbuf));
887                         pthread_mutex_unlock (&cache_lock);
889                         ERROR ("rrdtool plugin: strdup failed: %s", errbuf);
891                         sfree (rc->values[0]);
892                         sfree (rc->values);
893                         sfree (rc);
894                         return (-1);
895                 }
897                 c_avl_insert (cache, cache_key, rc);
898         }
900         DEBUG ("rrdtool plugin: rrd_cache_insert: file = %s; "
901                         "values_num = %i; age = %u;",
902                         filename, rc->values_num,
903                         rc->last_value - rc->first_value);
905         if ((rc->last_value - rc->first_value) >= cache_timeout)
906         {
907                 /* XXX: If you need to lock both, cache_lock and queue_lock, at
908                  * the same time, ALWAYS lock `cache_lock' first! */
909                 if (rc->flags != FLAG_QUEUED)
910                 {
911                         if (rrd_queue_cache_entry (filename) == 0)
912                                 rc->flags = FLAG_QUEUED;
913                 }
914                 else
915                 {
916                         DEBUG ("rrdtool plugin: `%s' is already queued.", filename);
917                 }
918         }
920         if ((cache_timeout > 0) &&
921                         ((time (NULL) - cache_flush_last) > cache_flush_timeout))
922                 rrd_cache_flush (cache_flush_timeout);
925         pthread_mutex_unlock (&cache_lock);
927         return (0);
928 } /* int rrd_cache_insert */
930 static int rrd_compare_numeric (const void *a_ptr, const void *b_ptr)
932         int a = *((int *) a_ptr);
933         int b = *((int *) b_ptr);
935         if (a < b)
936                 return (-1);
937         else if (a > b)
938                 return (1);
939         else
940                 return (0);
941 } /* int rrd_compare_numeric */
943 static int rrd_write (const data_set_t *ds, const value_list_t *vl)
945         struct stat  statbuf;
946         char         filename[512];
947         char         values[512];
948         int          status;
950         if (0 != strcmp (ds->type, vl->type)) {
951                 ERROR ("rrdtool plugin: DS type does not match value list type");
952                 return -1;
953         }
955         if (value_list_to_filename (filename, sizeof (filename), ds, vl) != 0)
956                 return (-1);
958         if (value_list_to_string (values, sizeof (values), ds, vl) != 0)
959                 return (-1);
961         if (stat (filename, &statbuf) == -1)
962         {
963                 if (errno == ENOENT)
964                 {
965                         if (rrd_create_file (filename, ds, vl))
966                                 return (-1);
967                 }
968                 else
969                 {
970                         char errbuf[1024];
971                         ERROR ("stat(%s) failed: %s", filename,
972                                         sstrerror (errno, errbuf,
973                                                 sizeof (errbuf)));
974                         return (-1);
975                 }
976         }
977         else if (!S_ISREG (statbuf.st_mode))
978         {
979                 ERROR ("stat(%s): Not a regular file!",
980                                 filename);
981                 return (-1);
982         }
984         status = rrd_cache_insert (filename, values, vl->time);
986         return (status);
987 } /* int rrd_write */
989 static int rrd_flush (int timeout, const char *identifier)
991         pthread_mutex_lock (&cache_lock);
993         if (cache == NULL) {
994                 pthread_mutex_unlock (&cache_lock);
995                 return (0);
996         }
998         rrd_cache_flush_identifier (timeout, identifier);
1000         pthread_mutex_unlock (&cache_lock);
1001         return (0);
1002 } /* int rrd_flush */
1004 static int rrd_config (const char *key, const char *value)
1006         if (strcasecmp ("CacheTimeout", key) == 0)
1007         {
1008                 int tmp = atoi (value);
1009                 if (tmp < 0)
1010                 {
1011                         fprintf (stderr, "rrdtool: `CacheTimeout' must "
1012                                         "be greater than 0.\n");
1013                         return (1);
1014                 }
1015                 cache_timeout = tmp;
1016         }
1017         else if (strcasecmp ("CacheFlush", key) == 0)
1018         {
1019                 int tmp = atoi (value);
1020                 if (tmp < 0)
1021                 {
1022                         fprintf (stderr, "rrdtool: `CacheFlush' must "
1023                                         "be greater than 0.\n");
1024                         return (1);
1025                 }
1026                 cache_flush_timeout = tmp;
1027         }
1028         else if (strcasecmp ("DataDir", key) == 0)
1029         {
1030                 if (datadir != NULL)
1031                         free (datadir);
1032                 datadir = strdup (value);
1033                 if (datadir != NULL)
1034                 {
1035                         int len = strlen (datadir);
1036                         while ((len > 0) && (datadir[len - 1] == '/'))
1037                         {
1038                                 len--;
1039                                 datadir[len] = '\0';
1040                         }
1041                         if (len <= 0)
1042                         {
1043                                 free (datadir);
1044                                 datadir = NULL;
1045                         }
1046                 }
1047         }
1048         else if (strcasecmp ("StepSize", key) == 0)
1049         {
1050                 stepsize = atoi (value);
1051                 if (stepsize < 0)
1052                         stepsize = 0;
1053         }
1054         else if (strcasecmp ("HeartBeat", key) == 0)
1055         {
1056                 heartbeat = atoi (value);
1057                 if (heartbeat < 0)
1058                         heartbeat = 0;
1059         }
1060         else if (strcasecmp ("RRARows", key) == 0)
1061         {
1062                 int tmp = atoi (value);
1063                 if (tmp <= 0)
1064                 {
1065                         fprintf (stderr, "rrdtool: `RRARows' must "
1066                                         "be greater than 0.\n");
1067                         return (1);
1068                 }
1069                 rrarows = tmp;
1070         }
1071         else if (strcasecmp ("RRATimespan", key) == 0)
1072         {
1073                 char *saveptr = NULL;
1074                 char *dummy;
1075                 char *ptr;
1076                 char *value_copy;
1077                 int *tmp_alloc;
1079                 value_copy = strdup (value);
1080                 if (value_copy == NULL)
1081                         return (1);
1083                 dummy = value_copy;
1084                 while ((ptr = strtok_r (dummy, ", \t", &saveptr)) != NULL)
1085                 {
1086                         dummy = NULL;
1087                         
1088                         tmp_alloc = realloc (rra_timespans_custom,
1089                                         sizeof (int) * (rra_timespans_custom_num + 1));
1090                         if (tmp_alloc == NULL)
1091                         {
1092                                 fprintf (stderr, "rrdtool: realloc failed.\n");
1093                                 free (value_copy);
1094                                 return (1);
1095                         }
1096                         rra_timespans_custom = tmp_alloc;
1097                         rra_timespans_custom[rra_timespans_custom_num] = atoi (ptr);
1098                         if (rra_timespans_custom[rra_timespans_custom_num] != 0)
1099                                 rra_timespans_custom_num++;
1100                 } /* while (strtok_r) */
1102                 qsort (/* base = */ rra_timespans_custom,
1103                                 /* nmemb  = */ rra_timespans_custom_num,
1104                                 /* size   = */ sizeof (rra_timespans_custom[0]),
1105                                 /* compar = */ rrd_compare_numeric);
1107                 free (value_copy);
1108         }
1109         else if (strcasecmp ("XFF", key) == 0)
1110         {
1111                 double tmp = atof (value);
1112                 if ((tmp < 0.0) || (tmp >= 1.0))
1113                 {
1114                         fprintf (stderr, "rrdtool: `XFF' must "
1115                                         "be in the range 0 to 1 (exclusive).");
1116                         return (1);
1117                 }
1118                 xff = tmp;
1119         }
1120         else
1121         {
1122                 return (-1);
1123         }
1124         return (0);
1125 } /* int rrd_config */
1127 static int rrd_shutdown (void)
1129         pthread_mutex_lock (&cache_lock);
1130         rrd_cache_flush (-1);
1131         pthread_mutex_unlock (&cache_lock);
1133         pthread_mutex_lock (&queue_lock);
1134         do_shutdown = 1;
1135         pthread_cond_signal (&queue_cond);
1136         pthread_mutex_unlock (&queue_lock);
1138         /* Wait for all the values to be written to disk before returning. */
1139         if (queue_thread != 0)
1140         {
1141                 pthread_join (queue_thread, NULL);
1142                 queue_thread = 0;
1143                 DEBUG ("rrdtool plugin: queue_thread exited.");
1144         }
1146         return (0);
1147 } /* int rrd_shutdown */
1149 static int rrd_init (void)
1151         int status;
1153         if (stepsize < 0)
1154                 stepsize = 0;
1155         if (heartbeat <= 0)
1156                 heartbeat = 2 * stepsize;
1158         if ((heartbeat > 0) && (heartbeat < interval_g))
1159                 WARNING ("rrdtool plugin: Your `heartbeat' is "
1160                                 "smaller than your `interval'. This will "
1161                                 "likely cause problems.");
1162         else if ((stepsize > 0) && (stepsize < interval_g))
1163                 WARNING ("rrdtool plugin: Your `stepsize' is "
1164                                 "smaller than your `interval'. This will "
1165                                 "create needlessly big RRD-files.");
1167         /* Set the cache up */
1168         pthread_mutex_lock (&cache_lock);
1170         cache = c_avl_create ((int (*) (const void *, const void *)) strcmp);
1171         if (cache == NULL)
1172         {
1173                 ERROR ("rrdtool plugin: c_avl_create failed.");
1174                 return (-1);
1175         }
1177         cache_flush_last = time (NULL);
1178         if (cache_timeout < 2)
1179         {
1180                 cache_timeout = 0;
1181                 cache_flush_timeout = 0;
1182         }
1183         else if (cache_flush_timeout < cache_timeout)
1184                 cache_flush_timeout = 10 * cache_timeout;
1186         pthread_mutex_unlock (&cache_lock);
1188         status = pthread_create (&queue_thread, NULL, rrd_queue_thread, NULL);
1189         if (status != 0)
1190         {
1191                 ERROR ("rrdtool plugin: Cannot create queue-thread.");
1192                 return (-1);
1193         }
1195         DEBUG ("rrdtool plugin: rrd_init: datadir = %s; stepsize = %i;"
1196                         " heartbeat = %i; rrarows = %i; xff = %lf;",
1197                         (datadir == NULL) ? "(null)" : datadir,
1198                         stepsize, heartbeat, rrarows, xff);
1200         return (0);
1201 } /* int rrd_init */
1203 void module_register (void)
1205         plugin_register_config ("rrdtool", rrd_config,
1206                         config_keys, config_keys_num);
1207         plugin_register_init ("rrdtool", rrd_init);
1208         plugin_register_write ("rrdtool", rrd_write);
1209         plugin_register_flush ("rrdtool", rrd_flush);
1210         plugin_register_shutdown ("rrdtool", rrd_shutdown);