Code

fix use of setlocale all over the place ...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.4.4  Copyright by Tobi Oetiker, 1997-2010
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
52     struct _timeb current_time;
54     _ftime(&current_time);
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
59     return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230     rrd_value_t pdp_temp_val,
231     int         current_cf,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static void normalize_time(
286     struct timeval *t)
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
303     struct timeval tmp_time;    /* used for time conversion */
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     char *opt_daemon = NULL;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
334     while (1) {
335         int       option_index = 0;
336         int       opt;
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340         if (opt == EOF)
341             break;
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
354     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355     if (opt_daemon != NULL) {
356         rrd_set_error ("The \"%s\" environment variable is defined, "
357                 "but \"%s\" cannot work with rrdcached. Either unset "
358                 "the environment variable or use \"update\" instead.",
359                 ENV_RRDCACHED_ADDRESS, argv[0]);
360         goto end_tag;
361     }
363     /* need at least 2 arguments: filename, data. */
364     if (argc - optind < 2) {
365         rrd_set_error("Not enough arguments");
366         goto end_tag;
367     }
368     rc.u_int = 0;
369     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370     rc.u_int = _rrd_update(argv[optind], tmplt,
371                            argc - optind - 1,
372                            (const char **) (argv + optind + 1), result);
373     result->value.u_int = rc.u_int;
374   end_tag:
375     return result;
378 int rrd_update(
379     int argc,
380     char **argv)
382     struct option long_options[] = {
383         {"template", required_argument, 0, 't'},
384         {"daemon",   required_argument, 0, 'd'},
385         {0, 0, 0, 0}
386     };
387     int       option_index = 0;
388     int       opt;
389     char     *tmplt = NULL;
390     int       rc = -1;
391     char     *opt_daemon = NULL;
393     optind = 0;
394     opterr = 0;         /* initialize getopt */
396     while (1) {
397         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399         if (opt == EOF)
400             break;
402         switch (opt) {
403         case 't':
404             tmplt = strdup(optarg);
405             break;
407         case 'd':
408             if (opt_daemon != NULL)
409                 free (opt_daemon);
410             opt_daemon = strdup (optarg);
411             if (opt_daemon == NULL)
412             {
413                 rrd_set_error("strdup failed.");
414                 goto out;
415             }
416             break;
418         case '?':
419             rrd_set_error("unknown option '%s'", argv[optind - 1]);
420             goto out;
421         }
422     }
424     /* need at least 2 arguments: filename, data. */
425     if (argc - optind < 2) {
426         rrd_set_error("Not enough arguments");
427         goto out;
428     }
430     {   /* try to connect to rrdcached */
431         int status = rrdc_connect(opt_daemon);
432         if (status != 0) return status;
433     }
435     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
436     {
437         rrd_set_error("The caching daemon cannot be used together with "
438                 "templates yet.");
439         goto out;
440     }
442     if (! rrdc_is_connected(opt_daemon))
443     {
444       rc = rrd_update_r(argv[optind], tmplt,
445                         argc - optind - 1, (const char **) (argv + optind + 1));
446     }
447     else /* we are connected */
448     {
449         rc = rrdc_update (argv[optind], /* file */
450                           argc - optind - 1, /* values_num */
451                           (const char *const *) (argv + optind + 1)); /* values */
452         if (rc > 0)
453             rrd_set_error("Failed sending the values to rrdcached: %s",
454                           rrd_strerror (rc));
455     }
457   out:
458     if (tmplt != NULL)
459     {
460         free(tmplt);
461         tmplt = NULL;
462     }
463     if (opt_daemon != NULL)
464     {
465         free (opt_daemon);
466         opt_daemon = NULL;
467     }
468     return rc;
471 int rrd_update_r(
472     const char *filename,
473     const char *tmplt,
474     int argc,
475     const char **argv)
477     return _rrd_update(filename, tmplt, argc, argv, NULL);
480 int _rrd_update(
481     const char *filename,
482     const char *tmplt,
483     int argc,
484     const char **argv,
485     rrd_info_t * pcdp_summary)
488     int       arg_i = 2;
490     unsigned long rra_begin;    /* byte pointer to the rra
491                                  * area in the rrd file.  this
492                                  * pointer never changes value */
493     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
494                              * to the existing entry */
495     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
496                              * to the cdp values */
498     long     *tmpl_idx; /* index representing the settings
499                          * transported by the tmplt index */
500     unsigned long tmpl_cnt = 2; /* time and data */
501     rrd_t     rrd;
502     time_t    current_time = 0;
503     unsigned long current_time_usec = 0;    /* microseconds part of current time */
504     char    **updvals;
505     int       schedule_smooth = 0;
507     /* number of elapsed PDP steps since last update */
508     unsigned long *rra_step_cnt = NULL;
510     int       version;  /* rrd version */
511     rrd_file_t *rrd_file;
512     char     *arg_copy; /* for processing the argv */
513     unsigned long *skip_update; /* RRAs to advance but not write */
515     /* need at least 1 arguments: data. */
516     if (argc < 1) {
517         rrd_set_error("Not enough arguments");
518         goto err_out;
519     }
521     rrd_init(&rrd);
522     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
523         goto err_free;
524     }
525     /* We are now at the beginning of the rra's */
526     rra_begin = rrd_file->header_len;
528     version = atoi(rrd.stat_head->version);
530     initialize_time(&current_time, &current_time_usec, version);
532     /* get exclusive lock to whole file.
533      * lock gets removed when we close the file.
534      */
535     if (rrd_lock(rrd_file) != 0) {
536         rrd_set_error("could not lock RRD");
537         goto err_close;
538     }
540     if (allocate_data_structures(&rrd, &updvals,
541                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
542                                  &rra_step_cnt, &skip_update,
543                                  &pdp_new) == -1) {
544         goto err_close;
545     }
547     /* loop through the arguments. */
548     for (arg_i = 0; arg_i < argc; arg_i++) {
549         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
550             rrd_set_error("failed duplication argv entry");
551             break;
552         }
553         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
554                         &current_time, &current_time_usec, pdp_temp, pdp_new,
555                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
556                         &pcdp_summary, version, skip_update,
557                         &schedule_smooth) == -1) {
558             if (rrd_test_error()) { /* Should have error string always here */
559                 char     *save_error;
561                 /* Prepend file name to error message */
562                 if ((save_error = strdup(rrd_get_error())) != NULL) {
563                     rrd_set_error("%s: %s", filename, save_error);
564                     free(save_error);
565                 }
566             }
567             free(arg_copy);
568             break;
569         }
570         free(arg_copy);
571     }
573     free(rra_step_cnt);
575     /* if we got here and if there is an error and if the file has not been
576      * written to, then close things up and return. */
577     if (rrd_test_error()) {
578         goto err_free_structures;
579     }
580 #ifndef HAVE_MMAP
581     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
582         goto err_free_structures;
583     }
584 #endif
586     /* calling the smoothing code here guarantees at most one smoothing
587      * operation per rrd_update call. Unfortunately, it is possible with bulk
588      * updates, or a long-delayed update for smoothing to occur off-schedule.
589      * This really isn't critical except during the burn-in cycles. */
590     if (schedule_smooth) {
591         smooth_all_rras(&rrd, rrd_file, rra_begin);
592     }
594 /*    rrd_dontneed(rrd_file,&rrd); */
595     rrd_free(&rrd);
596     rrd_close(rrd_file);
598     free(pdp_new);
599     free(tmpl_idx);
600     free(pdp_temp);
601     free(skip_update);
602     free(updvals);
603     return 0;
605   err_free_structures:
606     free(pdp_new);
607     free(tmpl_idx);
608     free(pdp_temp);
609     free(skip_update);
610     free(updvals);
611   err_close:
612     rrd_close(rrd_file);
613   err_free:
614     rrd_free(&rrd);
615   err_out:
616     return -1;
619 /*
620  * Allocate some important arrays used, and initialize the template.
621  *
622  * When it returns, either all of the structures are allocated
623  * or none of them are.
624  *
625  * Returns 0 on success, -1 on error.
626  */
627 static int allocate_data_structures(
628     rrd_t *rrd,
629     char ***updvals,
630     rrd_value_t **pdp_temp,
631     const char *tmplt,
632     long **tmpl_idx,
633     unsigned long *tmpl_cnt,
634     unsigned long **rra_step_cnt,
635     unsigned long **skip_update,
636     rrd_value_t **pdp_new)
638     unsigned  i, ii;
639     if ((*updvals = (char **) malloc(sizeof(char *)
640                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
641         rrd_set_error("allocating updvals pointer array.");
642         return -1;
643     }
644     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
645                                             * rrd->stat_head->ds_cnt)) ==
646         NULL) {
647         rrd_set_error("allocating pdp_temp.");
648         goto err_free_updvals;
649     }
650     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
651                                                  *
652                                                  rrd->stat_head->rra_cnt)) ==
653         NULL) {
654         rrd_set_error("allocating skip_update.");
655         goto err_free_pdp_temp;
656     }
657     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
658                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
659         rrd_set_error("allocating tmpl_idx.");
660         goto err_free_skip_update;
661     }
662     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
663                                                   *
664                                                   (rrd->stat_head->
665                                                    rra_cnt))) == NULL) {
666         rrd_set_error("allocating rra_step_cnt.");
667         goto err_free_tmpl_idx;
668     }
670     /* initialize tmplt redirector */
671     /* default config example (assume DS 1 is a CDEF DS)
672        tmpl_idx[0] -> 0; (time)
673        tmpl_idx[1] -> 1; (DS 0)
674        tmpl_idx[2] -> 3; (DS 2)
675        tmpl_idx[3] -> 4; (DS 3) */
676     (*tmpl_idx)[0] = 0; /* time */
677     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
678         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
679             (*tmpl_idx)[ii++] = i;
680     }
681     *tmpl_cnt = ii;
683     if (tmplt != NULL) {
684         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
685             goto err_free_rra_step_cnt;
686         }
687     }
689     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
690                                            * rrd->stat_head->ds_cnt)) == NULL) {
691         rrd_set_error("allocating pdp_new.");
692         goto err_free_rra_step_cnt;
693     }
695     return 0;
697   err_free_rra_step_cnt:
698     free(*rra_step_cnt);
699   err_free_tmpl_idx:
700     free(*tmpl_idx);
701   err_free_skip_update:
702     free(*skip_update);
703   err_free_pdp_temp:
704     free(*pdp_temp);
705   err_free_updvals:
706     free(*updvals);
707     return -1;
710 /*
711  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
712  *
713  * Returns 0 on success.
714  */
715 static int parse_template(
716     rrd_t *rrd,
717     const char *tmplt,
718     unsigned long *tmpl_cnt,
719     long *tmpl_idx)
721     char     *dsname, *tmplt_copy;
722     unsigned int tmpl_len, i;
723     int       ret = 0;
725     *tmpl_cnt = 1;      /* the first entry is the time */
727     /* we should work on a writeable copy here */
728     if ((tmplt_copy = strdup(tmplt)) == NULL) {
729         rrd_set_error("error copying tmplt '%s'", tmplt);
730         ret = -1;
731         goto out;
732     }
734     dsname = tmplt_copy;
735     tmpl_len = strlen(tmplt_copy);
736     for (i = 0; i <= tmpl_len; i++) {
737         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
738             tmplt_copy[i] = '\0';
739             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
740                 rrd_set_error("tmplt contains more DS definitions than RRD");
741                 ret = -1;
742                 goto out_free_tmpl_copy;
743             }
744             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
745                 rrd_set_error("unknown DS name '%s'", dsname);
746                 ret = -1;
747                 goto out_free_tmpl_copy;
748             }
749             /* go to the next entry on the tmplt_copy */
750             if (i < tmpl_len)
751                 dsname = &tmplt_copy[i + 1];
752         }
753     }
754   out_free_tmpl_copy:
755     free(tmplt_copy);
756   out:
757     return ret;
760 /*
761  * Parse an update string, updates the primary data points (PDPs)
762  * and consolidated data points (CDPs), and writes changes to the RRAs.
763  *
764  * Returns 0 on success, -1 on error.
765  */
766 static int process_arg(
767     char *step_start,
768     rrd_t *rrd,
769     rrd_file_t *rrd_file,
770     unsigned long rra_begin,
771     time_t *current_time,
772     unsigned long *current_time_usec,
773     rrd_value_t *pdp_temp,
774     rrd_value_t *pdp_new,
775     unsigned long *rra_step_cnt,
776     char **updvals,
777     long *tmpl_idx,
778     unsigned long tmpl_cnt,
779     rrd_info_t ** pcdp_summary,
780     int version,
781     unsigned long *skip_update,
782     int *schedule_smooth)
784     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
786     /* a vector of future Holt-Winters seasonal coefs */
787     unsigned long elapsed_pdp_st;
789     double    interval, pre_int, post_int;  /* interval between this and
790                                              * the last run */
791     unsigned long proc_pdp_cnt;
793     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
794                  current_time, current_time_usec, version) == -1) {
795         return -1;
796     }
798     interval = (double) (*current_time - rrd->live_head->last_up)
799         + (double) ((long) *current_time_usec -
800                     (long) rrd->live_head->last_up_usec) / 1e6f;
802     /* process the data sources and update the pdp_prep 
803      * area accordingly */
804     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
805         return -1;
806     }
808     elapsed_pdp_st = calculate_elapsed_steps(rrd,
809                                              *current_time,
810                                              *current_time_usec, interval,
811                                              &pre_int, &post_int,
812                                              &proc_pdp_cnt);
814     /* has a pdp_st moment occurred since the last run ? */
815     if (elapsed_pdp_st == 0) {
816         /* no we have not passed a pdp_st moment. therefore update is simple */
817         simple_update(rrd, interval, pdp_new);
818     } else {
819         /* an pdp_st has occurred. */
820         if (process_all_pdp_st(rrd, interval,
821                                pre_int, post_int,
822                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
823             return -1;
824         }
825         if (update_all_cdp_prep(rrd, rra_step_cnt,
826                                 rra_begin, rrd_file,
827                                 elapsed_pdp_st,
828                                 proc_pdp_cnt,
829                                 &last_seasonal_coef,
830                                 &seasonal_coef,
831                                 pdp_temp,
832                                 skip_update, schedule_smooth) == -1) {
833             goto err_free_coefficients;
834         }
835         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
836                                  elapsed_pdp_st, pdp_temp,
837                                  &seasonal_coef) == -1) {
838             goto err_free_coefficients;
839         }
840         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
841                           *current_time, skip_update,
842                           pcdp_summary) == -1) {
843             goto err_free_coefficients;
844         }
845     }                   /* endif a pdp_st has occurred */
846     rrd->live_head->last_up = *current_time;
847     rrd->live_head->last_up_usec = *current_time_usec;
849     if (version < 3) {
850         *rrd->legacy_last_up = rrd->live_head->last_up;
851     }
852     free(seasonal_coef);
853     free(last_seasonal_coef);
854     return 0;
856   err_free_coefficients:
857     free(seasonal_coef);
858     free(last_seasonal_coef);
859     return -1;
862 /*
863  * Parse a DS string (time + colon-separated values), storing the
864  * results in current_time, current_time_usec, and updvals.
865  *
866  * Returns 0 on success, -1 on error.
867  */
868 static int parse_ds(
869     rrd_t *rrd,
870     char **updvals,
871     long *tmpl_idx,
872     char *input,
873     unsigned long tmpl_cnt,
874     time_t *current_time,
875     unsigned long *current_time_usec,
876     int version)
878     char     *p;
879     unsigned long i;
880     char      timesyntax;
882     updvals[0] = input;
883     /* initialize all ds input to unknown except the first one
884        which has always got to be set */
885     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
886         updvals[i] = "U";
888     /* separate all ds elements; first must be examined separately
889        due to alternate time syntax */
890     if ((p = strchr(input, '@')) != NULL) {
891         timesyntax = '@';
892     } else if ((p = strchr(input, ':')) != NULL) {
893         timesyntax = ':';
894     } else {
895         rrd_set_error("expected timestamp not found in data source from %s",
896                       input);
897         return -1;
898     }
899     *p = '\0';
900     i = 1;
901     updvals[tmpl_idx[i++]] = p + 1;
902     while (*(++p)) {
903         if (*p == ':') {
904             *p = '\0';
905             if (i < tmpl_cnt) {
906                 updvals[tmpl_idx[i++]] = p + 1;
907             }
908             else {
909                 rrd_set_error("found extra data on update argument: %s",p+1);
910                 return -1;
911             }                
912         }
913     }
915     if (i != tmpl_cnt) {
916         rrd_set_error("expected %lu data source readings (got %lu) from %s",
917                       tmpl_cnt - 1, i - 1, input);
918         return -1;
919     }
921     if (get_time_from_reading(rrd, timesyntax, updvals,
922                               current_time, current_time_usec,
923                               version) == -1) {
924         return -1;
925     }
926     return 0;
929 /*
930  * Parse the time in a DS string, store it in current_time and 
931  * current_time_usec and verify that it's later than the last
932  * update for this DS.
933  *
934  * Returns 0 on success, -1 on error.
935  */
936 static int get_time_from_reading(
937     rrd_t *rrd,
938     char timesyntax,
939     char **updvals,
940     time_t *current_time,
941     unsigned long *current_time_usec,
942     int version)
944     double    tmp;
945     char     *parsetime_error = NULL;
946     char     *old_locale;
947     rrd_time_value_t ds_tv;
948     struct timeval tmp_time;    /* used for time conversion */
950     /* get the time from the reading ... handle N */
951     if (timesyntax == '@') {    /* at-style */
952         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
953             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
954             return -1;
955         }
956         if (ds_tv.type == RELATIVE_TO_END_TIME ||
957             ds_tv.type == RELATIVE_TO_START_TIME) {
958             rrd_set_error("specifying time relative to the 'start' "
959                           "or 'end' makes no sense here: %s", updvals[0]);
960             return -1;
961         }
962         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
963         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
964     } else if (strcmp(updvals[0], "N") == 0) {
965         gettimeofday(&tmp_time, 0);
966         normalize_time(&tmp_time);
967         *current_time = tmp_time.tv_sec;
968         *current_time_usec = tmp_time.tv_usec;
969     } else {
970         old_locale = setlocale(LC_NUMERIC, NULL);
971         setlocale(LC_NUMERIC, "C");
972         errno = 0;
973         tmp = strtod(updvals[0], 0);
974         if (errno > 0) {
975             rrd_set_error("converting '%s' to float: %s",
976                 updvals[0], rrd_strerror(errno));
977             return -1;
978         };
979         setlocale(LC_NUMERIC, old_locale);
980         if (tmp < 0.0){
981             gettimeofday(&tmp_time, 0);
982             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
983         }
985         *current_time = floor(tmp);
986         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
987     }
988     /* dont do any correction for old version RRDs */
989     if (version < 3)
990         *current_time_usec = 0;
992     if (*current_time < rrd->live_head->last_up ||
993         (*current_time == rrd->live_head->last_up &&
994          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
995         rrd_set_error("illegal attempt to update using time %ld when "
996                       "last update time is %ld (minimum one second step)",
997                       *current_time, rrd->live_head->last_up);
998         return -1;
999     }
1000     return 0;
1003 /*
1004  * Update pdp_new by interpreting the updvals according to the DS type
1005  * (COUNTER, GAUGE, etc.).
1006  *
1007  * Returns 0 on success, -1 on error.
1008  */
1009 static int update_pdp_prep(
1010     rrd_t *rrd,
1011     char **updvals,
1012     rrd_value_t *pdp_new,
1013     double interval)
1015     unsigned long ds_idx;
1016     int       ii;
1017     char     *endptr;   /* used in the conversion */
1018     double    rate;
1019     char     *old_locale;
1020     enum dst_en dst_idx;
1022     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1023         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1025         /* make sure we do not build diffs with old last_ds values */
1026         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1027             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1028             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1029         }
1031         /* NOTE: DST_CDEF should never enter this if block, because
1032          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1033          * accidently specified a value for the DST_CDEF. To handle this case,
1034          * an extra check is required. */
1036         if ((updvals[ds_idx + 1][0] != 'U') &&
1037             (dst_idx != DST_CDEF) &&
1038             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1039             rate = DNAN;
1041             /* pdp_new contains rate * time ... eg the bytes transferred during
1042              * the interval. Doing it this way saves a lot of math operations
1043              */
1044             switch (dst_idx) {
1045             case DST_COUNTER:
1046             case DST_DERIVE:
1047                 /* Check if this is a valid integer. `U' is already handled in
1048                  * another branch. */
1049                 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1050                     if ((ii == 0) && (dst_idx == DST_DERIVE)
1051                             && (updvals[ds_idx + 1][ii] == '-'))
1052                         continue;
1054                     if ((updvals[ds_idx + 1][ii] < '0')
1055                             || (updvals[ds_idx + 1][ii] > '9')) {
1056                         rrd_set_error("not a simple %s integer: '%s'",
1057                                 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1058                                 updvals[ds_idx + 1]);
1059                         return -1;
1060                     }
1061                 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1063                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1064                     pdp_new[ds_idx] =
1065                         rrd_diff(updvals[ds_idx + 1],
1066                                  rrd->pdp_prep[ds_idx].last_ds);
1067                     if (dst_idx == DST_COUNTER) {
1068                         /* simple overflow catcher. This will fail
1069                          * terribly for non 32 or 64 bit counters
1070                          * ... are there any others in SNMP land?
1071                          */
1072                         if (pdp_new[ds_idx] < (double) 0.0)
1073                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1074                         if (pdp_new[ds_idx] < (double) 0.0)
1075                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1076                     }
1077                     rate = pdp_new[ds_idx] / interval;
1078                 } else {
1079                     pdp_new[ds_idx] = DNAN;
1080                 }
1081                 break;
1082             case DST_ABSOLUTE:
1083                 old_locale = setlocale(LC_NUMERIC, NULL);
1084                 setlocale(LC_NUMERIC, "C");
1085                 errno = 0;
1086                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1087                 if (errno > 0) {
1088                     rrd_set_error("converting '%s' to float: %s",
1089                                   updvals[ds_idx + 1], rrd_strerror(errno));
1090                     return -1;
1091                 };
1092                 setlocale(LC_NUMERIC, old_locale);
1093                 if (endptr[0] != '\0') {
1094                     rrd_set_error
1095                         ("conversion of '%s' to float not complete: tail '%s'",
1096                          updvals[ds_idx + 1], endptr);
1097                     return -1;
1098                 }
1099                 rate = pdp_new[ds_idx] / interval;
1100                 break;
1101             case DST_GAUGE:
1102                 old_locale = setlocale(LC_NUMERIC, NULL);
1103                 setlocale(LC_NUMERIC, "C");
1104                 errno = 0;
1105                 pdp_new[ds_idx] =
1106                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1107                 if (errno) {
1108                     rrd_set_error("converting '%s' to float: %s",
1109                                   updvals[ds_idx + 1], rrd_strerror(errno));
1110                     return -1;
1111                 };
1112                 setlocale(LC_NUMERIC, old_locale);
1113                 if (endptr[0] != '\0') {
1114                     rrd_set_error
1115                         ("conversion of '%s' to float not complete: tail '%s'",
1116                          updvals[ds_idx + 1], endptr);
1117                     return -1;
1118                 }
1119                 rate = pdp_new[ds_idx] / interval;
1120                 break;
1121             default:
1122                 rrd_set_error("rrd contains unknown DS type : '%s'",
1123                               rrd->ds_def[ds_idx].dst);
1124                 return -1;
1125             }
1126             /* break out of this for loop if the error string is set */
1127             if (rrd_test_error()) {
1128                 return -1;
1129             }
1130             /* make sure pdp_temp is neither too large or too small
1131              * if any of these occur it becomes unknown ...
1132              * sorry folks ... */
1133             if (!isnan(rate) &&
1134                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1135                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1136                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1137                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1138                 pdp_new[ds_idx] = DNAN;
1139             }
1140         } else {
1141             /* no news is news all the same */
1142             pdp_new[ds_idx] = DNAN;
1143         }
1146         /* make a copy of the command line argument for the next run */
1147 #ifdef DEBUG
1148         fprintf(stderr, "prep ds[%lu]\t"
1149                 "last_arg '%s'\t"
1150                 "this_arg '%s'\t"
1151                 "pdp_new %10.2f\n",
1152                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1153                 pdp_new[ds_idx]);
1154 #endif
1155         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1156                 LAST_DS_LEN - 1);
1157         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1158     }
1159     return 0;
1162 /*
1163  * How many PDP steps have elapsed since the last update? Returns the answer,
1164  * and stores the time between the last update and the last PDP in pre_time,
1165  * and the time between the last PDP and the current time in post_int.
1166  */
1167 static int calculate_elapsed_steps(
1168     rrd_t *rrd,
1169     unsigned long current_time,
1170     unsigned long current_time_usec,
1171     double interval,
1172     double *pre_int,
1173     double *post_int,
1174     unsigned long *proc_pdp_cnt)
1176     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1177     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1178                                  * time */
1179     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1180                                  * when it was last updated */
1181     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1183     /* when was the current pdp started */
1184     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1185     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1187     /* when did the last pdp_st occur */
1188     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1189     occu_pdp_st = current_time - occu_pdp_age;
1191     if (occu_pdp_st > proc_pdp_st) {
1192         /* OK we passed the pdp_st moment */
1193         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1194                                                                      * occurred before the latest
1195                                                                      * pdp_st moment*/
1196         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1197         *post_int = occu_pdp_age;   /* how much after it */
1198         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1199     } else {
1200         *pre_int = interval;
1201         *post_int = 0;
1202     }
1204     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1206 #ifdef DEBUG
1207     printf("proc_pdp_age %lu\t"
1208            "proc_pdp_st %lu\t"
1209            "occu_pfp_age %lu\t"
1210            "occu_pdp_st %lu\t"
1211            "int %lf\t"
1212            "pre_int %lf\t"
1213            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1214            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1215 #endif
1217     /* compute the number of elapsed pdp_st moments */
1218     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1221 /*
1222  * Increment the PDP values by the values in pdp_new, or else initialize them.
1223  */
1224 static void simple_update(
1225     rrd_t *rrd,
1226     double interval,
1227     rrd_value_t *pdp_new)
1229     int       i;
1231     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1232         if (isnan(pdp_new[i])) {
1233             /* this is not really accurate if we use subsecond data arrival time
1234                should have thought of it when going subsecond resolution ...
1235                sorry next format change we will have it! */
1236             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1237                 floor(interval);
1238         } else {
1239             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1240                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1241             } else {
1242                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1243             }
1244         }
1245 #ifdef DEBUG
1246         fprintf(stderr,
1247                 "NO PDP  ds[%i]\t"
1248                 "value %10.2f\t"
1249                 "unkn_sec %5lu\n",
1250                 i,
1251                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1252                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1253 #endif
1254     }
1257 /*
1258  * Call process_pdp_st for each DS.
1259  *
1260  * Returns 0 on success, -1 on error.
1261  */
1262 static int process_all_pdp_st(
1263     rrd_t *rrd,
1264     double interval,
1265     double pre_int,
1266     double post_int,
1267     unsigned long elapsed_pdp_st,
1268     rrd_value_t *pdp_new,
1269     rrd_value_t *pdp_temp)
1271     unsigned long ds_idx;
1273     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1274        rate*seconds which occurred up to the last run.
1275        pdp_new[] contains rate*seconds from the latest run.
1276        pdp_temp[] will contain the rate for cdp */
1278     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1279         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1280                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1281                            pdp_new, pdp_temp) == -1) {
1282             return -1;
1283         }
1284 #ifdef DEBUG
1285         fprintf(stderr, "PDP UPD ds[%lu]\t"
1286                 "elapsed_pdp_st %lu\t"
1287                 "pdp_temp %10.2f\t"
1288                 "new_prep %10.2f\t"
1289                 "new_unkn_sec %5lu\n",
1290                 ds_idx,
1291                 elapsed_pdp_st,
1292                 pdp_temp[ds_idx],
1293                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1294                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1295 #endif
1296     }
1297     return 0;
1300 /*
1301  * Process an update that occurs after one of the PDP moments.
1302  * Increments the PDP value, sets NAN if time greater than the
1303  * heartbeats have elapsed, processes CDEFs.
1304  *
1305  * Returns 0 on success, -1 on error.
1306  */
1307 static int process_pdp_st(
1308     rrd_t *rrd,
1309     unsigned long ds_idx,
1310     double interval,
1311     double pre_int,
1312     double post_int,
1313     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1314     rrd_value_t *pdp_new,
1315     rrd_value_t *pdp_temp)
1317     int       i;
1319     /* update pdp_prep to the current pdp_st. */
1320     double    pre_unknown = 0.0;
1321     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1322     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1324     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1326     rpnstack_init(&rpnstack);
1329     if (isnan(pdp_new[ds_idx])) {
1330         /* a final bit of unknown to be added before calculation
1331            we use a temporary variable for this so that we
1332            don't have to turn integer lines before using the value */
1333         pre_unknown = pre_int;
1334     } else {
1335         if (isnan(scratch[PDP_val].u_val)) {
1336             scratch[PDP_val].u_val = 0;
1337         }
1338         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1339     }
1341     /* if too much of the pdp_prep is unknown we dump it */
1342     /* if the interval is larger thatn mrhb we get NAN */
1343     if ((interval > mrhb) ||
1344         (rrd->stat_head->pdp_step / 2.0 <
1345          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1346         pdp_temp[ds_idx] = DNAN;
1347     } else {
1348         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1349             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1350              pre_unknown);
1351     }
1353     /* process CDEF data sources; remember each CDEF DS can
1354      * only reference other DS with a lower index number */
1355     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1356         rpnp_t   *rpnp;
1358         rpnp =
1359             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1360         if(rpnp == NULL) {
1361           rpnstack_free(&rpnstack);
1362           return -1;
1363         }
1364         /* substitute data values for OP_VARIABLE nodes */
1365         for (i = 0; rpnp[i].op != OP_END; i++) {
1366             if (rpnp[i].op == OP_VARIABLE) {
1367                 rpnp[i].op = OP_NUMBER;
1368                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1369             }
1370         }
1371         /* run the rpn calculator */
1372         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1373             free(rpnp);
1374             rpnstack_free(&rpnstack);
1375             return -1;
1376         }
1377         free(rpnp);
1378     }
1380     /* make pdp_prep ready for the next run */
1381     if (isnan(pdp_new[ds_idx])) {
1382         /* this is not realy accurate if we use subsecond data arival time
1383            should have thought of it when going subsecond resolution ...
1384            sorry next format change we will have it! */
1385         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1386         scratch[PDP_val].u_val = DNAN;
1387     } else {
1388         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1389         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1390     }
1391     rpnstack_free(&rpnstack);
1392     return 0;
1395 /*
1396  * Iterate over all the RRAs for a given DS and:
1397  * 1. Decide whether to schedule a smooth later
1398  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1399  * 3. Update the CDP
1400  *
1401  * Returns 0 on success, -1 on error
1402  */
1403 static int update_all_cdp_prep(
1404     rrd_t *rrd,
1405     unsigned long *rra_step_cnt,
1406     unsigned long rra_begin,
1407     rrd_file_t *rrd_file,
1408     unsigned long elapsed_pdp_st,
1409     unsigned long proc_pdp_cnt,
1410     rrd_value_t **last_seasonal_coef,
1411     rrd_value_t **seasonal_coef,
1412     rrd_value_t *pdp_temp,
1413     unsigned long *skip_update,
1414     int *schedule_smooth)
1416     unsigned long rra_idx;
1418     /* index into the CDP scratch array */
1419     enum cf_en current_cf;
1420     unsigned long rra_start;
1422     /* number of rows to be updated in an RRA for a data value. */
1423     unsigned long start_pdp_offset;
1425     rra_start = rra_begin;
1426     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1427         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1428         start_pdp_offset =
1429             rrd->rra_def[rra_idx].pdp_cnt -
1430             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1431         skip_update[rra_idx] = 0;
1432         if (start_pdp_offset <= elapsed_pdp_st) {
1433             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1434                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1435         } else {
1436             rra_step_cnt[rra_idx] = 0;
1437         }
1439         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1440             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1441              * so that they will be correct for the next observed value; note that for
1442              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1443              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1444             if (rra_step_cnt[rra_idx] > 1) {
1445                 skip_update[rra_idx] = 1;
1446                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1447                                 elapsed_pdp_st, last_seasonal_coef);
1448                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1449                                 elapsed_pdp_st + 1, seasonal_coef);
1450             }
1451             /* periodically run a smoother for seasonal effects */
1452             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1453 #ifdef DEBUG
1454                 fprintf(stderr,
1455                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1456                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1457                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1458                         u_cnt);
1459 #endif
1460                 *schedule_smooth = 1;
1461             }
1462         }
1463         if (rrd_test_error())
1464             return -1;
1466         if (update_cdp_prep
1467             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1468              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1469              current_cf) == -1) {
1470             return -1;
1471         }
1472         rra_start +=
1473             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1474             sizeof(rrd_value_t);
1475     }
1476     return 0;
1479 /* 
1480  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1481  */
1482 static int do_schedule_smooth(
1483     rrd_t *rrd,
1484     unsigned long rra_idx,
1485     unsigned long elapsed_pdp_st)
1487     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1488     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1489     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1490     unsigned long seasonal_smooth_idx =
1491         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1492     unsigned long *init_seasonal =
1493         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1495     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1496      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1497      * really an RRA level, not a data source within RRA level parameter, but
1498      * the rra_def is read only for rrd_update (not flushed to disk). */
1499     if (*init_seasonal > BURNIN_CYCLES) {
1500         /* someone has no doubt invented a trick to deal with this wrap around,
1501          * but at least this code is clear. */
1502         if (seasonal_smooth_idx > cur_row) {
1503             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1504              * between PDP and CDP */
1505             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1506         }
1507         /* can't rely on negative numbers because we are working with
1508          * unsigned values */
1509         return (cur_row + elapsed_pdp_st >= row_cnt
1510                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1511     }
1512     /* mark off one of the burn-in cycles */
1513     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1516 /*
1517  * For a given RRA, iterate over the data sources and call the appropriate
1518  * consolidation function.
1519  *
1520  * Returns 0 on success, -1 on error.
1521  */
1522 static int update_cdp_prep(
1523     rrd_t *rrd,
1524     unsigned long elapsed_pdp_st,
1525     unsigned long start_pdp_offset,
1526     unsigned long *rra_step_cnt,
1527     int rra_idx,
1528     rrd_value_t *pdp_temp,
1529     rrd_value_t *last_seasonal_coef,
1530     rrd_value_t *seasonal_coef,
1531     int current_cf)
1533     unsigned long ds_idx, cdp_idx;
1535     /* update CDP_PREP areas */
1536     /* loop over data soures within each RRA */
1537     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1539         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1541         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1542             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1543                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1544                        elapsed_pdp_st, start_pdp_offset,
1545                        rrd->rra_def[rra_idx].pdp_cnt,
1546                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1547                        rra_idx, ds_idx);
1548         } else {
1549             /* Nothing to consolidate if there's one PDP per CDP. However, if
1550              * we've missed some PDPs, let's update null counters etc. */
1551             if (elapsed_pdp_st > 2) {
1552                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1553                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1554                           (enum cf_en)current_cf);
1555             }
1556         }
1558         if (rrd_test_error())
1559             return -1;
1560     }                   /* endif data sources loop */
1561     return 0;
1564 /*
1565  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1566  * primary value, secondary value, and # of unknowns.
1567  */
1568 static void update_cdp(
1569     unival *scratch,
1570     int current_cf,
1571     rrd_value_t pdp_temp_val,
1572     unsigned long rra_step_cnt,
1573     unsigned long elapsed_pdp_st,
1574     unsigned long start_pdp_offset,
1575     unsigned long pdp_cnt,
1576     rrd_value_t xff,
1577     int i,
1578     int ii)
1580     /* shorthand variables */
1581     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1582     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1583     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1584     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1586     if (rra_step_cnt) {
1587         /* If we are in this block, as least 1 CDP value will be written to
1588          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1589          * to be written, then the "fill in" value is the CDP_secondary_val
1590          * entry. */
1591         if (isnan(pdp_temp_val)) {
1592             *cdp_unkn_pdp_cnt += start_pdp_offset;
1593             *cdp_secondary_val = DNAN;
1594         } else {
1595             /* CDP_secondary value is the RRA "fill in" value for intermediary
1596              * CDP data entries. No matter the CF, the value is the same because
1597              * the average, max, min, and last of a list of identical values is
1598              * the same, namely, the value itself. */
1599             *cdp_secondary_val = pdp_temp_val;
1600         }
1602         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1603             *cdp_primary_val = DNAN;
1604         } else {
1605             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1606                                start_pdp_offset, pdp_cnt);
1607         }
1608         *cdp_val =
1609             initialize_carry_over(pdp_temp_val,current_cf,
1610                                   elapsed_pdp_st,
1611                                   start_pdp_offset, pdp_cnt);
1612                /* endif meets xff value requirement for a valid value */
1613         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1614          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1615         if (isnan(pdp_temp_val))
1616             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1617         else
1618             *cdp_unkn_pdp_cnt = 0;
1619     } else {            /* rra_step_cnt[i]  == 0 */
1621 #ifdef DEBUG
1622         if (isnan(*cdp_val)) {
1623             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1624                     i, ii);
1625         } else {
1626             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1627                     i, ii, *cdp_val);
1628         }
1629 #endif
1630         if (isnan(pdp_temp_val)) {
1631             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1632         } else {
1633             *cdp_val =
1634                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1635                                   current_cf, i, ii);
1636         }
1637     }
1640 /*
1641  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1642  * on the type of consolidation function.
1643  */
1644 static void initialize_cdp_val(
1645     unival *scratch,
1646     int current_cf,
1647     rrd_value_t pdp_temp_val,
1648     unsigned long start_pdp_offset,
1649     unsigned long pdp_cnt)
1651     rrd_value_t cum_val, cur_val;
1653     switch (current_cf) {
1654     case CF_AVERAGE:
1655         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1656         cur_val = IFDNAN(pdp_temp_val, 0.0);
1657         scratch[CDP_primary_val].u_val =
1658             (cum_val + cur_val * start_pdp_offset) /
1659             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1660         break;
1661     case CF_MAXIMUM: 
1662         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1663         cur_val = IFDNAN(pdp_temp_val, -DINF);
1665 #if 0
1666 #ifdef DEBUG
1667         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1668             fprintf(stderr,
1669                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1670                     i, ii);
1671             exit(-1);
1672         }
1673 #endif
1674 #endif
1675         if (cur_val > cum_val)
1676             scratch[CDP_primary_val].u_val = cur_val;
1677         else
1678             scratch[CDP_primary_val].u_val = cum_val;
1679         break;
1680     case CF_MINIMUM:
1681         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1682         cur_val = IFDNAN(pdp_temp_val, DINF);
1683 #if 0
1684 #ifdef DEBUG
1685         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1686             fprintf(stderr,
1687                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1688                     ii);
1689             exit(-1);
1690         }
1691 #endif
1692 #endif
1693         if (cur_val < cum_val)
1694             scratch[CDP_primary_val].u_val = cur_val;
1695         else
1696             scratch[CDP_primary_val].u_val = cum_val;
1697         break;
1698     case CF_LAST:
1699     default:
1700         scratch[CDP_primary_val].u_val = pdp_temp_val;
1701         break;
1702     }
1705 /*
1706  * Update the consolidation function for Holt-Winters functions as
1707  * well as other functions that don't actually consolidate multiple
1708  * PDPs.
1709  */
1710 static void reset_cdp(
1711     rrd_t *rrd,
1712     unsigned long elapsed_pdp_st,
1713     rrd_value_t *pdp_temp,
1714     rrd_value_t *last_seasonal_coef,
1715     rrd_value_t *seasonal_coef,
1716     int rra_idx,
1717     int ds_idx,
1718     int cdp_idx,
1719     enum cf_en current_cf)
1721     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1723     switch (current_cf) {
1724     case CF_AVERAGE:
1725     default:
1726         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1727         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1728         break;
1729     case CF_SEASONAL:
1730     case CF_DEVSEASONAL:
1731         /* need to update cached seasonal values, so they are consistent
1732          * with the bulk update */
1733         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1734          * CDP_last_deviation are the same. */
1735         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1736         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1737         break;
1738     case CF_HWPREDICT:
1739     case CF_MHWPREDICT:
1740         /* need to update the null_count and last_null_count.
1741          * even do this for non-DNAN pdp_temp because the
1742          * algorithm is not learning from batch updates. */
1743         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1744         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1745         /* fall through */
1746     case CF_DEVPREDICT:
1747         scratch[CDP_primary_val].u_val = DNAN;
1748         scratch[CDP_secondary_val].u_val = DNAN;
1749         break;
1750     case CF_FAILURES:
1751         /* do not count missed bulk values as failures */
1752         scratch[CDP_primary_val].u_val = 0;
1753         scratch[CDP_secondary_val].u_val = 0;
1754         /* need to reset violations buffer.
1755          * could do this more carefully, but for now, just
1756          * assume a bulk update wipes away all violations. */
1757         erase_violations(rrd, cdp_idx, rra_idx);
1758         break;
1759     }
1762 static rrd_value_t initialize_carry_over(
1763     rrd_value_t pdp_temp_val,
1764     int current_cf,
1765     unsigned long elapsed_pdp_st,
1766     unsigned long start_pdp_offset,
1767     unsigned long pdp_cnt)
1769     unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1770     if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1771         switch (current_cf) {
1772         case CF_MAXIMUM:
1773             return -DINF;
1774         case CF_MINIMUM:
1775             return DINF;
1776         case CF_AVERAGE:
1777             return 0;
1778         default:
1779             return DNAN;
1780         }        
1781     } 
1782     else {
1783         switch (current_cf) {
1784         case CF_AVERAGE:
1785             return pdp_temp_val *  pdp_into_cdp_cnt ;
1786         default:
1787             return pdp_temp_val;
1788         }        
1789     }        
1792 /*
1793  * Update or initialize a CDP value based on the consolidation
1794  * function.
1795  *
1796  * Returns the new value.
1797  */
1798 static rrd_value_t calculate_cdp_val(
1799     rrd_value_t cdp_val,
1800     rrd_value_t pdp_temp_val,
1801     unsigned long elapsed_pdp_st,
1802     int current_cf,
1803 #ifdef DEBUG
1804     int i,
1805     int ii
1806 #else
1807     int UNUSED(i),
1808     int UNUSED(ii)
1809 #endif
1810     )
1812     if (isnan(cdp_val)) {
1813         if (current_cf == CF_AVERAGE) {
1814             pdp_temp_val *= elapsed_pdp_st;
1815         }
1816 #ifdef DEBUG
1817         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1818                 i, ii, pdp_temp_val);
1819 #endif
1820         return pdp_temp_val;
1821     }
1822     if (current_cf == CF_AVERAGE)
1823         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1824     if (current_cf == CF_MINIMUM)
1825         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1826     if (current_cf == CF_MAXIMUM)
1827         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1829     return pdp_temp_val;
1832 /*
1833  * For each RRA, update the seasonal values and then call update_aberrant_CF
1834  * for each data source.
1835  *
1836  * Return 0 on success, -1 on error.
1837  */
1838 static int update_aberrant_cdps(
1839     rrd_t *rrd,
1840     rrd_file_t *rrd_file,
1841     unsigned long rra_begin,
1842     unsigned long elapsed_pdp_st,
1843     rrd_value_t *pdp_temp,
1844     rrd_value_t **seasonal_coef)
1846     unsigned long rra_idx, ds_idx, j;
1848     /* number of PDP steps since the last update that
1849      * are assigned to the first CDP to be generated
1850      * since the last update. */
1851     unsigned short scratch_idx;
1852     unsigned long rra_start;
1853     enum cf_en current_cf;
1855     /* this loop is only entered if elapsed_pdp_st < 3 */
1856     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1857          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1858         rra_start = rra_begin;
1859         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1860             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1861                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1862                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1863                     if (scratch_idx == CDP_primary_val) {
1864                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1865                                         elapsed_pdp_st + 1, seasonal_coef);
1866                     } else {
1867                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868                                         elapsed_pdp_st + 2, seasonal_coef);
1869                     }
1870                 }
1871                 if (rrd_test_error())
1872                     return -1;
1873                 /* loop over data soures within each RRA */
1874                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1875                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1876                                        rra_idx * (rrd->stat_head->ds_cnt) +
1877                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1878                                        *seasonal_coef);
1879                 }
1880             }
1881             rra_start += rrd->rra_def[rra_idx].row_cnt
1882                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1883         }
1884     }
1885     return 0;
1888 /* 
1889  * Move sequentially through the file, writing one RRA at a time.  Note this
1890  * architecture divorces the computation of CDP with flushing updated RRA
1891  * entries to disk.
1892  *
1893  * Return 0 on success, -1 on error.
1894  */
1895 static int write_to_rras(
1896     rrd_t *rrd,
1897     rrd_file_t *rrd_file,
1898     unsigned long *rra_step_cnt,
1899     unsigned long rra_begin,
1900     time_t current_time,
1901     unsigned long *skip_update,
1902     rrd_info_t ** pcdp_summary)
1904     unsigned long rra_idx;
1905     unsigned long rra_start;
1906     time_t    rra_time = 0; /* time of update for a RRA */
1908     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1909     
1910     /* Ready to write to disk */
1911     rra_start = rra_begin;
1913     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1914         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1915         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1917         /* for cdp_prep */
1918         unsigned short scratch_idx;
1919         unsigned long step_subtract;
1921         for (scratch_idx = CDP_primary_val,
1922                  step_subtract = 1;
1923              rra_step_cnt[rra_idx] > 0;
1924              rra_step_cnt[rra_idx]--,
1925                  scratch_idx = CDP_secondary_val,
1926                  step_subtract = 2) {
1928             size_t rra_pos_new;
1929 #ifdef DEBUG
1930             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1931 #endif
1932             /* increment, with wrap-around */
1933             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1934               rra_ptr->cur_row = 0;
1936             /* we know what our position should be */
1937             rra_pos_new = rra_start
1938               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1940             /* re-seek if the position is wrong or we wrapped around */
1941             if ((size_t)rra_pos_new != rrd_file->pos) {
1942                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1943                     rrd_set_error("seek error in rrd");
1944                     return -1;
1945                 }
1946             }
1947 #ifdef DEBUG
1948             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1949 #endif
1951             if (skip_update[rra_idx])
1952                 continue;
1954             if (*pcdp_summary != NULL) {
1955                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1957                 rra_time = (current_time - current_time % step_time)
1958                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1959             }
1961             if (write_RRA_row
1962                 (rrd_file, rrd, rra_idx, scratch_idx,
1963                  pcdp_summary, rra_time) == -1)
1964                 return -1;
1966             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1967         }
1969         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1970     } /* RRA LOOP */
1972     return 0;
1975 /*
1976  * Write out one row of values (one value per DS) to the archive.
1977  *
1978  * Returns 0 on success, -1 on error.
1979  */
1980 static int write_RRA_row(
1981     rrd_file_t *rrd_file,
1982     rrd_t *rrd,
1983     unsigned long rra_idx,
1984     unsigned short CDP_scratch_idx,
1985     rrd_info_t ** pcdp_summary,
1986     time_t rra_time)
1988     unsigned long ds_idx, cdp_idx;
1989     rrd_infoval_t iv;
1991     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1992         /* compute the cdp index */
1993         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1994 #ifdef DEBUG
1995         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1996                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1997                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1998 #endif
1999         if (*pcdp_summary != NULL) {
2000             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2001             /* append info to the return hash */
2002             *pcdp_summary = rrd_info_push(*pcdp_summary,
2003                                           sprintf_alloc
2004                                           ("[%lli]RRA[%s][%lu]DS[%s]", 
2005                                            (long long)rra_time,
2006                                            rrd->rra_def[rra_idx].cf_nam,
2007                                            rrd->rra_def[rra_idx].pdp_cnt,
2008                                            rrd->ds_def[ds_idx].ds_nam),
2009                                            RD_I_VAL, iv);
2010         }
2011         errno = 0;
2012         if (rrd_write(rrd_file,
2013                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2014                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2015             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2016             return -1;
2017         }
2018     }
2019     return 0;
2022 /*
2023  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2024  *
2025  * Returns 0 on success, -1 otherwise
2026  */
2027 static int smooth_all_rras(
2028     rrd_t *rrd,
2029     rrd_file_t *rrd_file,
2030     unsigned long rra_begin)
2032     unsigned long rra_start = rra_begin;
2033     unsigned long rra_idx;
2035     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2036         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2037             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2038 #ifdef DEBUG
2039             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2040 #endif
2041             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2042             if (rrd_test_error())
2043                 return -1;
2044         }
2045         rra_start += rrd->rra_def[rra_idx].row_cnt
2046             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2047     }
2048     return 0;
2051 #ifndef HAVE_MMAP
2052 /*
2053  * Flush changes to disk (unless we're using mmap)
2054  *
2055  * Returns 0 on success, -1 otherwise
2056  */
2057 static int write_changes_to_disk(
2058     rrd_t *rrd,
2059     rrd_file_t *rrd_file,
2060     int version)
2062     /* we just need to write back the live header portion now */
2063     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2064                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2065                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2066                  SEEK_SET) != 0) {
2067         rrd_set_error("seek rrd for live header writeback");
2068         return -1;
2069     }
2070     if (version >= 3) {
2071         if (rrd_write(rrd_file, rrd->live_head,
2072                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2073             rrd_set_error("rrd_write live_head to rrd");
2074             return -1;
2075         }
2076     } else {
2077         if (rrd_write(rrd_file, rrd->legacy_last_up,
2078                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2079             rrd_set_error("rrd_write live_head to rrd");
2080             return -1;
2081         }
2082     }
2085     if (rrd_write(rrd_file, rrd->pdp_prep,
2086                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2087         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2088         rrd_set_error("rrd_write pdp_prep to rrd");
2089         return -1;
2090     }
2092     if (rrd_write(rrd_file, rrd->cdp_prep,
2093                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2094                   rrd->stat_head->ds_cnt)
2095         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2096                       rrd->stat_head->ds_cnt)) {
2098         rrd_set_error("rrd_write cdp_prep to rrd");
2099         return -1;
2100     }
2102     if (rrd_write(rrd_file, rrd->rra_ptr,
2103                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2104         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2105         rrd_set_error("rrd_write rra_ptr to rrd");
2106         return -1;
2107     }
2108     return 0;
2110 #endif