Code

make negative update times work as diescribed in the documentation (-5 is NOW-5seconds)
[rrdtool-all.git] / program / src / rrd_update.c
2 /*****************************************************************************
3  * RRDtool 1.3.8  Copyright by Tobi Oetiker, 1997-2009
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #ifdef WIN32
21 #include <stdlib.h>
22 #endif
24 #include "rrd_hw.h"
25 #include "rrd_rpncalc.h"
27 #include "rrd_is_thread_safe.h"
28 #include "unused.h"
30 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 /*
32  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
33  * replacement.
34  */
35 #include <sys/timeb.h>
37 #ifndef __MINGW32__
38 struct timeval {
39     time_t    tv_sec;   /* seconds */
40     long      tv_usec;  /* microseconds */
41 };
42 #endif
44 struct __timezone {
45     int       tz_minuteswest;   /* minutes W of Greenwich */
46     int       tz_dsttime;   /* type of dst correction */
47 };
49 static int gettimeofday(
50     struct timeval *t,
51     struct __timezone *tz)
52 {
54     struct _timeb current_time;
56     _ftime(&current_time);
58     t->tv_sec = current_time.time;
59     t->tv_usec = current_time.millitm * 1000;
61     return 0;
62 }
64 #endif
66 /* FUNCTION PROTOTYPES */
68 int       rrd_update_r(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv);
73 int       _rrd_update(
74     const char *filename,
75     const char *tmplt,
76     int argc,
77     const char **argv,
78     rrd_info_t *);
80 static int allocate_data_structures(
81     rrd_t *rrd,
82     char ***updvals,
83     rrd_value_t **pdp_temp,
84     const char *tmplt,
85     long **tmpl_idx,
86     unsigned long *tmpl_cnt,
87     unsigned long **rra_step_cnt,
88     unsigned long **skip_update,
89     rrd_value_t **pdp_new);
91 static int parse_template(
92     rrd_t *rrd,
93     const char *tmplt,
94     unsigned long *tmpl_cnt,
95     long *tmpl_idx);
97 static int process_arg(
98     char *step_start,
99     rrd_t *rrd,
100     rrd_file_t *rrd_file,
101     unsigned long rra_begin,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *skip_update,
183     int *schedule_smooth);
185 static int do_schedule_smooth(
186     rrd_t *rrd,
187     unsigned long rra_idx,
188     unsigned long elapsed_pdp_st);
190 static int update_cdp_prep(
191     rrd_t *rrd,
192     unsigned long elapsed_pdp_st,
193     unsigned long start_pdp_offset,
194     unsigned long *rra_step_cnt,
195     int rra_idx,
196     rrd_value_t *pdp_temp,
197     rrd_value_t *last_seasonal_coef,
198     rrd_value_t *seasonal_coef,
199     int current_cf);
201 static void update_cdp(
202     unival *scratch,
203     int current_cf,
204     rrd_value_t pdp_temp_val,
205     unsigned long rra_step_cnt,
206     unsigned long elapsed_pdp_st,
207     unsigned long start_pdp_offset,
208     unsigned long pdp_cnt,
209     rrd_value_t xff,
210     int i,
211     int ii);
213 static void initialize_cdp_val(
214     unival *scratch,
215     int current_cf,
216     rrd_value_t pdp_temp_val,
217     unsigned long elapsed_pdp_st,
218     unsigned long start_pdp_offset,
219     unsigned long pdp_cnt);
221 static void reset_cdp(
222     rrd_t *rrd,
223     unsigned long elapsed_pdp_st,
224     rrd_value_t *pdp_temp,
225     rrd_value_t *last_seasonal_coef,
226     rrd_value_t *seasonal_coef,
227     int rra_idx,
228     int ds_idx,
229     int cdp_idx,
230     enum cf_en current_cf);
232 static rrd_value_t initialize_average_carry_over(
233     rrd_value_t pdp_temp_val,
234     unsigned long elapsed_pdp_st,
235     unsigned long start_pdp_offset,
236     unsigned long pdp_cnt);
238 static rrd_value_t calculate_cdp_val(
239     rrd_value_t cdp_val,
240     rrd_value_t pdp_temp_val,
241     unsigned long elapsed_pdp_st,
242     int current_cf,
243     int i,
244     int ii);
246 static int update_aberrant_cdps(
247     rrd_t *rrd,
248     rrd_file_t *rrd_file,
249     unsigned long rra_begin,
250     unsigned long elapsed_pdp_st,
251     rrd_value_t *pdp_temp,
252     rrd_value_t **seasonal_coef);
254 static int write_to_rras(
255     rrd_t *rrd,
256     rrd_file_t *rrd_file,
257     unsigned long *rra_step_cnt,
258     unsigned long rra_begin,
259     time_t current_time,
260     unsigned long *skip_update,
261     rrd_info_t ** pcdp_summary);
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned short CDP_scratch_idx,
268     rrd_info_t ** pcdp_summary,
269     time_t rra_time);
271 static int smooth_all_rras(
272     rrd_t *rrd,
273     rrd_file_t *rrd_file,
274     unsigned long rra_begin);
276 #ifndef HAVE_MMAP
277 static int write_changes_to_disk(
278     rrd_t *rrd,
279     rrd_file_t *rrd_file,
280     int version);
281 #endif
283 /*
284  * normalize time as returned by gettimeofday. usec part must
285  * be always >= 0
286  */
287 static inline void normalize_time(
288     struct timeval *t)
290     if (t->tv_usec < 0) {
291         t->tv_sec--;
292         t->tv_usec += 1e6L;
293     }
296 /*
297  * Sets current_time and current_time_usec based on the current time.
298  * current_time_usec is set to 0 if the version number is 1 or 2.
299  */
300 static inline void initialize_time(
301     time_t *current_time,
302     unsigned long *current_time_usec,
303     int version)
305     struct timeval tmp_time;    /* used for time conversion */
307     gettimeofday(&tmp_time, 0);
308     normalize_time(&tmp_time);
309     *current_time = tmp_time.tv_sec;
310     if (version >= 3) {
311         *current_time_usec = tmp_time.tv_usec;
312     } else {
313         *current_time_usec = 0;
314     }
317 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319 rrd_info_t *rrd_update_v(
320     int argc,
321     char **argv)
323     char     *tmplt = NULL;
324     rrd_info_t *result = NULL;
325     rrd_infoval_t rc;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
335     while (1) {
336         int       option_index = 0;
337         int       opt;
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341         if (opt == EOF)
342             break;
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
355     /* need at least 2 arguments: filename, data. */
356     if (argc - optind < 2) {
357         rrd_set_error("Not enough arguments");
358         goto end_tag;
359     }
360     rc.u_int = 0;
361     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
362     rc.u_int = _rrd_update(argv[optind], tmplt,
363                            argc - optind - 1,
364                            (const char **) (argv + optind + 1), result);
365     result->value.u_int = rc.u_int;
366   end_tag:
367     return result;
370 int rrd_update(
371     int argc,
372     char **argv)
374     struct option long_options[] = {
375         {"template", required_argument, 0, 't'},
376         {0, 0, 0, 0}
377     };
378     int       option_index = 0;
379     int       opt;
380     char     *tmplt = NULL;
381     int       rc = -1;
383     optind = 0;
384     opterr = 0;         /* initialize getopt */
386     while (1) {
387         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389         if (opt == EOF)
390             break;
392         switch (opt) {
393         case 't':
394             tmplt = strdup(optarg);
395             break;
397         case '?':
398             rrd_set_error("unknown option '%s'", argv[optind - 1]);
399             goto out;
400         }
401     }
403     /* need at least 2 arguments: filename, data. */
404     if (argc - optind < 2) {
405         rrd_set_error("Not enough arguments");
406         goto out;
407     }
409     rc = rrd_update_r(argv[optind], tmplt,
410                       argc - optind - 1, (const char **) (argv + optind + 1));
411   out:
412     free(tmplt);
413     return rc;
416 int rrd_update_r(
417     const char *filename,
418     const char *tmplt,
419     int argc,
420     const char **argv)
422     return _rrd_update(filename, tmplt, argc, argv, NULL);
425 int _rrd_update(
426     const char *filename,
427     const char *tmplt,
428     int argc,
429     const char **argv,
430     rrd_info_t * pcdp_summary)
433     int       arg_i = 2;
435     unsigned long rra_begin;    /* byte pointer to the rra
436                                  * area in the rrd file.  this
437                                  * pointer never changes value */
438     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
439                              * to the existing entry */
440     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
441                              * to the cdp values */
443     long     *tmpl_idx; /* index representing the settings
444                          * transported by the tmplt index */
445     unsigned long tmpl_cnt = 2; /* time and data */
446     rrd_t     rrd;
447     time_t    current_time = 0;
448     unsigned long current_time_usec = 0;    /* microseconds part of current time */
449     char    **updvals;
450     int       schedule_smooth = 0;
452     /* number of elapsed PDP steps since last update */
453     unsigned long *rra_step_cnt = NULL;
455     int       version;  /* rrd version */
456     rrd_file_t *rrd_file;
457     char     *arg_copy; /* for processing the argv */
458     unsigned long *skip_update; /* RRAs to advance but not write */
460     /* need at least 1 arguments: data. */
461     if (argc < 1) {
462         rrd_set_error("Not enough arguments");
463         goto err_out;
464     }
466     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
467         goto err_free;
468     }
469     /* We are now at the beginning of the rra's */
470     rra_begin = rrd_file->header_len;
472     version = atoi(rrd.stat_head->version);
474     initialize_time(&current_time, &current_time_usec, version);
476     /* get exclusive lock to whole file.
477      * lock gets removed when we close the file.
478      */
479     if (rrd_lock(rrd_file) != 0) {
480         rrd_set_error("could not lock RRD");
481         goto err_close;
482     }
484     if (allocate_data_structures(&rrd, &updvals,
485                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
486                                  &rra_step_cnt, &skip_update,
487                                  &pdp_new) == -1) {
488         goto err_close;
489     }
491     /* loop through the arguments. */
492     for (arg_i = 0; arg_i < argc; arg_i++) {
493         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
494             rrd_set_error("failed duplication argv entry");
495             break;
496         }
497         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
498                         &current_time, &current_time_usec, pdp_temp, pdp_new,
499                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
500                         &pcdp_summary, version, skip_update,
501                         &schedule_smooth) == -1) {
502             if (rrd_test_error()) { /* Should have error string always here */
503                 char     *save_error;
505                 /* Prepend file name to error message */
506                 if ((save_error = strdup(rrd_get_error())) != NULL) {
507                     rrd_set_error("%s: %s", filename, save_error);
508                     free(save_error);
509                 }
510             }
511             free(arg_copy);
512             break;
513         }
514         free(arg_copy);
515     }
517     free(rra_step_cnt);
519     /* if we got here and if there is an error and if the file has not been
520      * written to, then close things up and return. */
521     if (rrd_test_error()) {
522         goto err_free_structures;
523     }
524 #ifndef HAVE_MMAP
525     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
526         goto err_free_structures;
527     }
528 #endif
530     /* calling the smoothing code here guarantees at most one smoothing
531      * operation per rrd_update call. Unfortunately, it is possible with bulk
532      * updates, or a long-delayed update for smoothing to occur off-schedule.
533      * This really isn't critical except during the burn-in cycles. */
534     if (schedule_smooth) {
535         smooth_all_rras(&rrd, rrd_file, rra_begin);
536     }
538 /*    rrd_dontneed(rrd_file,&rrd); */
539     rrd_free(&rrd);
540     rrd_close(rrd_file);
542     free(pdp_new);
543     free(tmpl_idx);
544     free(pdp_temp);
545     free(skip_update);
546     free(updvals);
547     return 0;
549   err_free_structures:
550     free(pdp_new);
551     free(tmpl_idx);
552     free(pdp_temp);
553     free(skip_update);
554     free(updvals);
555   err_close:
556     rrd_close(rrd_file);
557   err_free:
558     rrd_free(&rrd);
559   err_out:
560     return -1;
563 /*
564  * get exclusive lock to whole file.
565  * lock gets removed when we close the file
566  *
567  * returns 0 on success
568  */
569 int rrd_lock(
570     rrd_file_t *file)
572     int       rcstat;
574     {
575 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
576         struct _stat st;
578         if (_fstat(file->fd, &st) == 0) {
579             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
580         } else {
581             rcstat = -1;
582         }
583 #else
584         struct flock lock;
586         lock.l_type = F_WRLCK;  /* exclusive write lock */
587         lock.l_len = 0; /* whole file */
588         lock.l_start = 0;   /* start of file */
589         lock.l_whence = SEEK_SET;   /* end of file */
591         rcstat = fcntl(file->fd, F_SETLK, &lock);
592 #endif
593     }
595     return (rcstat);
598 /*
599  * Allocate some important arrays used, and initialize the template.
600  *
601  * When it returns, either all of the structures are allocated
602  * or none of them are.
603  *
604  * Returns 0 on success, -1 on error.
605  */
606 static int allocate_data_structures(
607     rrd_t *rrd,
608     char ***updvals,
609     rrd_value_t **pdp_temp,
610     const char *tmplt,
611     long **tmpl_idx,
612     unsigned long *tmpl_cnt,
613     unsigned long **rra_step_cnt,
614     unsigned long **skip_update,
615     rrd_value_t **pdp_new)
617     unsigned  i, ii;
618     if ((*updvals = (char **) malloc(sizeof(char *)
619                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
620         rrd_set_error("allocating updvals pointer array.");
621         return -1;
622     }
623     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
624                                             * rrd->stat_head->ds_cnt)) ==
625         NULL) {
626         rrd_set_error("allocating pdp_temp.");
627         goto err_free_updvals;
628     }
629     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
630                                                  *
631                                                  rrd->stat_head->rra_cnt)) ==
632         NULL) {
633         rrd_set_error("allocating skip_update.");
634         goto err_free_pdp_temp;
635     }
636     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
637                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
638         rrd_set_error("allocating tmpl_idx.");
639         goto err_free_skip_update;
640     }
641     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
642                                                   *
643                                                   (rrd->stat_head->
644                                                    rra_cnt))) == NULL) {
645         rrd_set_error("allocating rra_step_cnt.");
646         goto err_free_tmpl_idx;
647     }
649     /* initialize tmplt redirector */
650     /* default config example (assume DS 1 is a CDEF DS)
651        tmpl_idx[0] -> 0; (time)
652        tmpl_idx[1] -> 1; (DS 0)
653        tmpl_idx[2] -> 3; (DS 2)
654        tmpl_idx[3] -> 4; (DS 3) */
655     (*tmpl_idx)[0] = 0; /* time */
656     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
657         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
658             (*tmpl_idx)[ii++] = i;
659     }
660     *tmpl_cnt = ii;
662     if (tmplt != NULL) {
663         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
664             goto err_free_rra_step_cnt;
665         }
666     }
668     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
669                                            * rrd->stat_head->ds_cnt)) == NULL) {
670         rrd_set_error("allocating pdp_new.");
671         goto err_free_rra_step_cnt;
672     }
674     return 0;
676   err_free_rra_step_cnt:
677     free(*rra_step_cnt);
678   err_free_tmpl_idx:
679     free(*tmpl_idx);
680   err_free_skip_update:
681     free(*skip_update);
682   err_free_pdp_temp:
683     free(*pdp_temp);
684   err_free_updvals:
685     free(*updvals);
686     return -1;
689 /*
690  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
691  *
692  * Returns 0 on success.
693  */
694 static int parse_template(
695     rrd_t *rrd,
696     const char *tmplt,
697     unsigned long *tmpl_cnt,
698     long *tmpl_idx)
700     char     *dsname, *tmplt_copy;
701     unsigned int tmpl_len, i;
702     int       ret = 0;
704     *tmpl_cnt = 1;      /* the first entry is the time */
706     /* we should work on a writeable copy here */
707     if ((tmplt_copy = strdup(tmplt)) == NULL) {
708         rrd_set_error("error copying tmplt '%s'", tmplt);
709         ret = -1;
710         goto out;
711     }
713     dsname = tmplt_copy;
714     tmpl_len = strlen(tmplt_copy);
715     for (i = 0; i <= tmpl_len; i++) {
716         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
717             tmplt_copy[i] = '\0';
718             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
719                 rrd_set_error("tmplt contains more DS definitions than RRD");
720                 ret = -1;
721                 goto out_free_tmpl_copy;
722             }
723             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
724                 rrd_set_error("unknown DS name '%s'", dsname);
725                 ret = -1;
726                 goto out_free_tmpl_copy;
727             }
728             /* go to the next entry on the tmplt_copy */
729             if (i < tmpl_len)
730                 dsname = &tmplt_copy[i + 1];
731         }
732     }
733   out_free_tmpl_copy:
734     free(tmplt_copy);
735   out:
736     return ret;
739 /*
740  * Parse an update string, updates the primary data points (PDPs)
741  * and consolidated data points (CDPs), and writes changes to the RRAs.
742  *
743  * Returns 0 on success, -1 on error.
744  */
745 static int process_arg(
746     char *step_start,
747     rrd_t *rrd,
748     rrd_file_t *rrd_file,
749     unsigned long rra_begin,
750     time_t *current_time,
751     unsigned long *current_time_usec,
752     rrd_value_t *pdp_temp,
753     rrd_value_t *pdp_new,
754     unsigned long *rra_step_cnt,
755     char **updvals,
756     long *tmpl_idx,
757     unsigned long tmpl_cnt,
758     rrd_info_t ** pcdp_summary,
759     int version,
760     unsigned long *skip_update,
761     int *schedule_smooth)
763     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
765     /* a vector of future Holt-Winters seasonal coefs */
766     unsigned long elapsed_pdp_st;
768     double    interval, pre_int, post_int;  /* interval between this and
769                                              * the last run */
770     unsigned long proc_pdp_cnt;
772     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
773                  current_time, current_time_usec, version) == -1) {
774         return -1;
775     }
777     interval = (double) (*current_time - rrd->live_head->last_up)
778         + (double) ((long) *current_time_usec -
779                     (long) rrd->live_head->last_up_usec) / 1e6f;
781     /* process the data sources and update the pdp_prep 
782      * area accordingly */
783     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
784         return -1;
785     }
787     elapsed_pdp_st = calculate_elapsed_steps(rrd,
788                                              *current_time,
789                                              *current_time_usec, interval,
790                                              &pre_int, &post_int,
791                                              &proc_pdp_cnt);
793     /* has a pdp_st moment occurred since the last run ? */
794     if (elapsed_pdp_st == 0) {
795         /* no we have not passed a pdp_st moment. therefore update is simple */
796         simple_update(rrd, interval, pdp_new);
797     } else {
798         /* an pdp_st has occurred. */
799         if (process_all_pdp_st(rrd, interval,
800                                pre_int, post_int,
801                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
802             return -1;
803         }
804         if (update_all_cdp_prep(rrd, rra_step_cnt,
805                                 rra_begin, rrd_file,
806                                 elapsed_pdp_st,
807                                 proc_pdp_cnt,
808                                 &last_seasonal_coef,
809                                 &seasonal_coef,
810                                 pdp_temp,
811                                 skip_update, schedule_smooth) == -1) {
812             goto err_free_coefficients;
813         }
814         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
815                                  elapsed_pdp_st, pdp_temp,
816                                  &seasonal_coef) == -1) {
817             goto err_free_coefficients;
818         }
819         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
820                           *current_time, skip_update,
821                           pcdp_summary) == -1) {
822             goto err_free_coefficients;
823         }
824     }                   /* endif a pdp_st has occurred */
825     rrd->live_head->last_up = *current_time;
826     rrd->live_head->last_up_usec = *current_time_usec;
828     if (version < 3) {
829         *rrd->legacy_last_up = rrd->live_head->last_up;
830     }
831     free(seasonal_coef);
832     free(last_seasonal_coef);
833     return 0;
835   err_free_coefficients:
836     free(seasonal_coef);
837     free(last_seasonal_coef);
838     return -1;
841 /*
842  * Parse a DS string (time + colon-separated values), storing the
843  * results in current_time, current_time_usec, and updvals.
844  *
845  * Returns 0 on success, -1 on error.
846  */
847 static int parse_ds(
848     rrd_t *rrd,
849     char **updvals,
850     long *tmpl_idx,
851     char *input,
852     unsigned long tmpl_cnt,
853     time_t *current_time,
854     unsigned long *current_time_usec,
855     int version)
857     char     *p;
858     unsigned long i;
859     char      timesyntax;
861     updvals[0] = input;
862     /* initialize all ds input to unknown except the first one
863        which has always got to be set */
864     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
865         updvals[i] = "U";
867     /* separate all ds elements; first must be examined separately
868        due to alternate time syntax */
869     if ((p = strchr(input, '@')) != NULL) {
870         timesyntax = '@';
871     } else if ((p = strchr(input, ':')) != NULL) {
872         timesyntax = ':';
873     } else {
874         rrd_set_error("expected timestamp not found in data source from %s",
875                       input);
876         return -1;
877     }
878     *p = '\0';
879     i = 1;
880     updvals[tmpl_idx[i++]] = p + 1;
881     while (*(++p)) {
882         if (*p == ':') {
883             *p = '\0';
884             if (i < tmpl_cnt) {
885                 updvals[tmpl_idx[i++]] = p + 1;
886             }
887         }
888     }
890     if (i != tmpl_cnt) {
891         rrd_set_error("expected %lu data source readings (got %lu) from %s",
892                       tmpl_cnt - 1, i, input);
893         return -1;
894     }
896     if (get_time_from_reading(rrd, timesyntax, updvals,
897                               current_time, current_time_usec,
898                               version) == -1) {
899         return -1;
900     }
901     return 0;
904 /*
905  * Parse the time in a DS string, store it in current_time and 
906  * current_time_usec and verify that it's later than the last
907  * update for this DS.
908  *
909  * Returns 0 on success, -1 on error.
910  */
911 static int get_time_from_reading(
912     rrd_t *rrd,
913     char timesyntax,
914     char **updvals,
915     time_t *current_time,
916     unsigned long *current_time_usec,
917     int version)
919     double    tmp;
920     char     *parsetime_error = NULL;
921     char     *old_locale;
922     rrd_time_value_t ds_tv;
923     struct timeval tmp_time;    /* used for time conversion */
925     /* get the time from the reading ... handle N */
926     if (timesyntax == '@') {    /* at-style */
927         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
928             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
929             return -1;
930         }
931         if (ds_tv.type == RELATIVE_TO_END_TIME ||
932             ds_tv.type == RELATIVE_TO_START_TIME) {
933             rrd_set_error("specifying time relative to the 'start' "
934                           "or 'end' makes no sense here: %s", updvals[0]);
935             return -1;
936         }
937         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
938         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
939     } else if (strcmp(updvals[0], "N") == 0) {
940         gettimeofday(&tmp_time, 0);
941         normalize_time(&tmp_time);
942         *current_time = tmp_time.tv_sec;
943         *current_time_usec = tmp_time.tv_usec;
944     } else {
945         old_locale = setlocale(LC_NUMERIC, "C");
946         errno = 0;
947         tmp = strtod(updvals[0], 0);
948         if (errno > 0) {
949             rrd_set_error("converting '%s' to float: %s",
950                 updvals[0], rrd_strerror(errno));
951             return -1;
952         };
953         setlocale(LC_NUMERIC, old_locale);
954         if (tmp < 0.0){
955             gettimeofday(&tmp_time, 0);
956             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
957         }
959         *current_time = floor(tmp);
960         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
961     }
962     /* dont do any correction for old version RRDs */
963     if (version < 3)
964         *current_time_usec = 0;
966     if (*current_time < rrd->live_head->last_up ||
967         (*current_time == rrd->live_head->last_up &&
968          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
969         rrd_set_error("illegal attempt to update using time %ld when "
970                       "last update time is %ld (minimum one second step)",
971                       *current_time, rrd->live_head->last_up);
972         return -1;
973     }
974     return 0;
977 /*
978  * Update pdp_new by interpreting the updvals according to the DS type
979  * (COUNTER, GAUGE, etc.).
980  *
981  * Returns 0 on success, -1 on error.
982  */
983 static int update_pdp_prep(
984     rrd_t *rrd,
985     char **updvals,
986     rrd_value_t *pdp_new,
987     double interval)
989     unsigned long ds_idx;
990     int       ii;
991     char     *endptr;   /* used in the conversion */
992     double    rate;
993     char     *old_locale;
994     enum dst_en dst_idx;
996     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
997         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
999         /* make sure we do not build diffs with old last_ds values */
1000         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1001             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1002             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1003         }
1005         /* NOTE: DST_CDEF should never enter this if block, because
1006          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1007          * accidently specified a value for the DST_CDEF. To handle this case,
1008          * an extra check is required. */
1010         if ((updvals[ds_idx + 1][0] != 'U') &&
1011             (dst_idx != DST_CDEF) &&
1012             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1013             rate = DNAN;
1015             /* pdp_new contains rate * time ... eg the bytes transferred during
1016              * the interval. Doing it this way saves a lot of math operations
1017              */
1018             switch (dst_idx) {
1019             case DST_COUNTER:
1020             case DST_DERIVE:
1021                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1022                     if ((updvals[ds_idx + 1][ii] < '0'
1023                          || updvals[ds_idx + 1][ii] > '9')
1024                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1025                         rrd_set_error("not a simple integer: '%s'",
1026                                       updvals[ds_idx + 1]);
1027                         return -1;
1028                     }
1029                 }
1030                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1031                     pdp_new[ds_idx] =
1032                         rrd_diff(updvals[ds_idx + 1],
1033                                  rrd->pdp_prep[ds_idx].last_ds);
1034                     if (dst_idx == DST_COUNTER) {
1035                         /* simple overflow catcher. This will fail
1036                          * terribly for non 32 or 64 bit counters
1037                          * ... are there any others in SNMP land?
1038                          */
1039                         if (pdp_new[ds_idx] < (double) 0.0)
1040                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1041                         if (pdp_new[ds_idx] < (double) 0.0)
1042                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1043                     }
1044                     rate = pdp_new[ds_idx] / interval;
1045                 } else {
1046                     pdp_new[ds_idx] = DNAN;
1047                 }
1048                 break;
1049             case DST_ABSOLUTE:
1050                 old_locale = setlocale(LC_NUMERIC, "C");
1051                 errno = 0;
1052                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1053                 if (errno > 0) {
1054                     rrd_set_error("converting '%s' to float: %s",
1055                                   updvals[ds_idx + 1], rrd_strerror(errno));
1056                     return -1;
1057                 };
1058                 setlocale(LC_NUMERIC, old_locale);
1059                 if (endptr[0] != '\0') {
1060                     rrd_set_error
1061                         ("conversion of '%s' to float not complete: tail '%s'",
1062                          updvals[ds_idx + 1], endptr);
1063                     return -1;
1064                 }
1065                 rate = pdp_new[ds_idx] / interval;
1066                 break;
1067             case DST_GAUGE:
1068                 old_locale = setlocale(LC_NUMERIC, "C");
1069                 errno = 0;
1070                 pdp_new[ds_idx] =
1071                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1072                 if (errno) {
1073                     rrd_set_error("converting '%s' to float: %s",
1074                                   updvals[ds_idx + 1], rrd_strerror(errno));
1075                     return -1;
1076                 };
1077                 setlocale(LC_NUMERIC, old_locale);
1078                 if (endptr[0] != '\0') {
1079                     rrd_set_error
1080                         ("conversion of '%s' to float not complete: tail '%s'",
1081                          updvals[ds_idx + 1], endptr);
1082                     return -1;
1083                 }
1084                 rate = pdp_new[ds_idx] / interval;
1085                 break;
1086             default:
1087                 rrd_set_error("rrd contains unknown DS type : '%s'",
1088                               rrd->ds_def[ds_idx].dst);
1089                 return -1;
1090             }
1091             /* break out of this for loop if the error string is set */
1092             if (rrd_test_error()) {
1093                 return -1;
1094             }
1095             /* make sure pdp_temp is neither too large or too small
1096              * if any of these occur it becomes unknown ...
1097              * sorry folks ... */
1098             if (!isnan(rate) &&
1099                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1100                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1101                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1102                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1103                 pdp_new[ds_idx] = DNAN;
1104             }
1105         } else {
1106             /* no news is news all the same */
1107             pdp_new[ds_idx] = DNAN;
1108         }
1111         /* make a copy of the command line argument for the next run */
1112 #ifdef DEBUG
1113         fprintf(stderr, "prep ds[%lu]\t"
1114                 "last_arg '%s'\t"
1115                 "this_arg '%s'\t"
1116                 "pdp_new %10.2f\n",
1117                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1118                 pdp_new[ds_idx]);
1119 #endif
1120         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1121                 LAST_DS_LEN - 1);
1122         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1123     }
1124     return 0;
1127 /*
1128  * How many PDP steps have elapsed since the last update? Returns the answer,
1129  * and stores the time between the last update and the last PDP in pre_time,
1130  * and the time between the last PDP and the current time in post_int.
1131  */
1132 static int calculate_elapsed_steps(
1133     rrd_t *rrd,
1134     unsigned long current_time,
1135     unsigned long current_time_usec,
1136     double interval,
1137     double *pre_int,
1138     double *post_int,
1139     unsigned long *proc_pdp_cnt)
1141     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1142     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1143                                  * time */
1144     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1145                                  * when it was last updated */
1146     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1148     /* when was the current pdp started */
1149     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1150     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1152     /* when did the last pdp_st occur */
1153     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1154     occu_pdp_st = current_time - occu_pdp_age;
1156     if (occu_pdp_st > proc_pdp_st) {
1157         /* OK we passed the pdp_st moment */
1158         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1159                                                                      * occurred before the latest
1160                                                                      * pdp_st moment*/
1161         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1162         *post_int = occu_pdp_age;   /* how much after it */
1163         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1164     } else {
1165         *pre_int = interval;
1166         *post_int = 0;
1167     }
1169     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1171 #ifdef DEBUG
1172     printf("proc_pdp_age %lu\t"
1173            "proc_pdp_st %lu\t"
1174            "occu_pfp_age %lu\t"
1175            "occu_pdp_st %lu\t"
1176            "int %lf\t"
1177            "pre_int %lf\t"
1178            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1179            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1180 #endif
1182     /* compute the number of elapsed pdp_st moments */
1183     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1186 /*
1187  * Increment the PDP values by the values in pdp_new, or else initialize them.
1188  */
1189 static void simple_update(
1190     rrd_t *rrd,
1191     double interval,
1192     rrd_value_t *pdp_new)
1194     int       i;
1196     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1197         if (isnan(pdp_new[i])) {
1198             /* this is not really accurate if we use subsecond data arrival time
1199                should have thought of it when going subsecond resolution ...
1200                sorry next format change we will have it! */
1201             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1202                 floor(interval);
1203         } else {
1204             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1205                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1206             } else {
1207                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1208             }
1209         }
1210 #ifdef DEBUG
1211         fprintf(stderr,
1212                 "NO PDP  ds[%i]\t"
1213                 "value %10.2f\t"
1214                 "unkn_sec %5lu\n",
1215                 i,
1216                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1217                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1218 #endif
1219     }
1222 /*
1223  * Call process_pdp_st for each DS.
1224  *
1225  * Returns 0 on success, -1 on error.
1226  */
1227 static int process_all_pdp_st(
1228     rrd_t *rrd,
1229     double interval,
1230     double pre_int,
1231     double post_int,
1232     unsigned long elapsed_pdp_st,
1233     rrd_value_t *pdp_new,
1234     rrd_value_t *pdp_temp)
1236     unsigned long ds_idx;
1238     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1239        rate*seconds which occurred up to the last run.
1240        pdp_new[] contains rate*seconds from the latest run.
1241        pdp_temp[] will contain the rate for cdp */
1243     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1244         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1245                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1246                            pdp_new, pdp_temp) == -1) {
1247             return -1;
1248         }
1249 #ifdef DEBUG
1250         fprintf(stderr, "PDP UPD ds[%lu]\t"
1251                 "elapsed_pdp_st %lu\t"
1252                 "pdp_temp %10.2f\t"
1253                 "new_prep %10.2f\t"
1254                 "new_unkn_sec %5lu\n",
1255                 ds_idx,
1256                 elapsed_pdp_st,
1257                 pdp_temp[ds_idx],
1258                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1259                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1260 #endif
1261     }
1262     return 0;
1265 /*
1266  * Process an update that occurs after one of the PDP moments.
1267  * Increments the PDP value, sets NAN if time greater than the
1268  * heartbeats have elapsed, processes CDEFs.
1269  *
1270  * Returns 0 on success, -1 on error.
1271  */
1272 static int process_pdp_st(
1273     rrd_t *rrd,
1274     unsigned long ds_idx,
1275     double interval,
1276     double pre_int,
1277     double post_int,
1278     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1279     rrd_value_t *pdp_new,
1280     rrd_value_t *pdp_temp)
1282     int       i;
1284     /* update pdp_prep to the current pdp_st. */
1285     double    pre_unknown = 0.0;
1286     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1287     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1289     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1291     rpnstack_init(&rpnstack);
1294     if (isnan(pdp_new[ds_idx])) {
1295         /* a final bit of unknown to be added before calculation
1296            we use a temporary variable for this so that we
1297            don't have to turn integer lines before using the value */
1298         pre_unknown = pre_int;
1299     } else {
1300         if (isnan(scratch[PDP_val].u_val)) {
1301             scratch[PDP_val].u_val = 0;
1302         }
1303         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1304     }
1306     /* if too much of the pdp_prep is unknown we dump it */
1307     /* if the interval is larger thatn mrhb we get NAN */
1308     if ((interval > mrhb) ||
1309         (rrd->stat_head->pdp_step / 2.0 <
1310          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1311         pdp_temp[ds_idx] = DNAN;
1312     } else {
1313         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1314             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1315              pre_unknown);
1316     }
1318     /* process CDEF data sources; remember each CDEF DS can
1319      * only reference other DS with a lower index number */
1320     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1321         rpnp_t   *rpnp;
1323         rpnp =
1324             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1325         /* substitute data values for OP_VARIABLE nodes */
1326         for (i = 0; rpnp[i].op != OP_END; i++) {
1327             if (rpnp[i].op == OP_VARIABLE) {
1328                 rpnp[i].op = OP_NUMBER;
1329                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1330             }
1331         }
1332         /* run the rpn calculator */
1333         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1334             free(rpnp);
1335             rpnstack_free(&rpnstack);
1336             return -1;
1337         }
1338     }
1340     /* make pdp_prep ready for the next run */
1341     if (isnan(pdp_new[ds_idx])) {
1342         /* this is not realy accurate if we use subsecond data arival time
1343            should have thought of it when going subsecond resolution ...
1344            sorry next format change we will have it! */
1345         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1346         scratch[PDP_val].u_val = DNAN;
1347     } else {
1348         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1349         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1350     }
1351     rpnstack_free(&rpnstack);
1352     return 0;
1355 /*
1356  * Iterate over all the RRAs for a given DS and:
1357  * 1. Decide whether to schedule a smooth later
1358  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1359  * 3. Update the CDP
1360  *
1361  * Returns 0 on success, -1 on error
1362  */
1363 static int update_all_cdp_prep(
1364     rrd_t *rrd,
1365     unsigned long *rra_step_cnt,
1366     unsigned long rra_begin,
1367     rrd_file_t *rrd_file,
1368     unsigned long elapsed_pdp_st,
1369     unsigned long proc_pdp_cnt,
1370     rrd_value_t **last_seasonal_coef,
1371     rrd_value_t **seasonal_coef,
1372     rrd_value_t *pdp_temp,
1373     unsigned long *skip_update,
1374     int *schedule_smooth)
1376     unsigned long rra_idx;
1378     /* index into the CDP scratch array */
1379     enum cf_en current_cf;
1380     unsigned long rra_start;
1382     /* number of rows to be updated in an RRA for a data value. */
1383     unsigned long start_pdp_offset;
1385     rra_start = rra_begin;
1386     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1387         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1388         start_pdp_offset =
1389             rrd->rra_def[rra_idx].pdp_cnt -
1390             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1391         skip_update[rra_idx] = 0;
1392         if (start_pdp_offset <= elapsed_pdp_st) {
1393             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1394                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1395         } else {
1396             rra_step_cnt[rra_idx] = 0;
1397         }
1399         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1400             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1401              * so that they will be correct for the next observed value; note that for
1402              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1403              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1404             if (rra_step_cnt[rra_idx] > 1) {
1405                 skip_update[rra_idx] = 1;
1406                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1407                                 elapsed_pdp_st, last_seasonal_coef);
1408                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1409                                 elapsed_pdp_st + 1, seasonal_coef);
1410             }
1411             /* periodically run a smoother for seasonal effects */
1412             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1413 #ifdef DEBUG
1414                 fprintf(stderr,
1415                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1416                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1417                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1418                         u_cnt);
1419 #endif
1420                 *schedule_smooth = 1;
1421             }
1422         }
1423         if (rrd_test_error())
1424             return -1;
1426         if (update_cdp_prep
1427             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1428              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1429              current_cf) == -1) {
1430             return -1;
1431         }
1432         rra_start +=
1433             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1434             sizeof(rrd_value_t);
1435     }
1436     return 0;
1439 /* 
1440  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1441  */
1442 static int do_schedule_smooth(
1443     rrd_t *rrd,
1444     unsigned long rra_idx,
1445     unsigned long elapsed_pdp_st)
1447     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1448     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1449     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1450     unsigned long seasonal_smooth_idx =
1451         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1452     unsigned long *init_seasonal =
1453         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1455     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1456      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1457      * really an RRA level, not a data source within RRA level parameter, but
1458      * the rra_def is read only for rrd_update (not flushed to disk). */
1459     if (*init_seasonal > BURNIN_CYCLES) {
1460         /* someone has no doubt invented a trick to deal with this wrap around,
1461          * but at least this code is clear. */
1462         if (seasonal_smooth_idx > cur_row) {
1463             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1464              * between PDP and CDP */
1465             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1466         }
1467         /* can't rely on negative numbers because we are working with
1468          * unsigned values */
1469         return (cur_row + elapsed_pdp_st >= row_cnt
1470                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1471     }
1472     /* mark off one of the burn-in cycles */
1473     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1476 /*
1477  * For a given RRA, iterate over the data sources and call the appropriate
1478  * consolidation function.
1479  *
1480  * Returns 0 on success, -1 on error.
1481  */
1482 static int update_cdp_prep(
1483     rrd_t *rrd,
1484     unsigned long elapsed_pdp_st,
1485     unsigned long start_pdp_offset,
1486     unsigned long *rra_step_cnt,
1487     int rra_idx,
1488     rrd_value_t *pdp_temp,
1489     rrd_value_t *last_seasonal_coef,
1490     rrd_value_t *seasonal_coef,
1491     int current_cf)
1493     unsigned long ds_idx, cdp_idx;
1495     /* update CDP_PREP areas */
1496     /* loop over data soures within each RRA */
1497     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1499         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1501         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1502             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1503                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1504                        elapsed_pdp_st, start_pdp_offset,
1505                        rrd->rra_def[rra_idx].pdp_cnt,
1506                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1507                        rra_idx, ds_idx);
1508         } else {
1509             /* Nothing to consolidate if there's one PDP per CDP. However, if
1510              * we've missed some PDPs, let's update null counters etc. */
1511             if (elapsed_pdp_st > 2) {
1512                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1513                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1514                           (enum cf_en)current_cf);
1515             }
1516         }
1518         if (rrd_test_error())
1519             return -1;
1520     }                   /* endif data sources loop */
1521     return 0;
1524 /*
1525  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1526  * primary value, secondary value, and # of unknowns.
1527  */
1528 static void update_cdp(
1529     unival *scratch,
1530     int current_cf,
1531     rrd_value_t pdp_temp_val,
1532     unsigned long rra_step_cnt,
1533     unsigned long elapsed_pdp_st,
1534     unsigned long start_pdp_offset,
1535     unsigned long pdp_cnt,
1536     rrd_value_t xff,
1537     int i,
1538     int ii)
1540     /* shorthand variables */
1541     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1542     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1543     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1544     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1546     if (rra_step_cnt) {
1547         /* If we are in this block, as least 1 CDP value will be written to
1548          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1549          * to be written, then the "fill in" value is the CDP_secondary_val
1550          * entry. */
1551         if (isnan(pdp_temp_val)) {
1552             *cdp_unkn_pdp_cnt += start_pdp_offset;
1553             *cdp_secondary_val = DNAN;
1554         } else {
1555             /* CDP_secondary value is the RRA "fill in" value for intermediary
1556              * CDP data entries. No matter the CF, the value is the same because
1557              * the average, max, min, and last of a list of identical values is
1558              * the same, namely, the value itself. */
1559             *cdp_secondary_val = pdp_temp_val;
1560         }
1562         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1563             *cdp_primary_val = DNAN;
1564             if (current_cf == CF_AVERAGE) {
1565                 *cdp_val =
1566                     initialize_average_carry_over(pdp_temp_val,
1567                                                   elapsed_pdp_st,
1568                                                   start_pdp_offset, pdp_cnt);
1569             } else {
1570                 *cdp_val = pdp_temp_val;
1571             }
1572         } else {
1573             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1574                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1575         }               /* endif meets xff value requirement for a valid value */
1576         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1577          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1578         if (isnan(pdp_temp_val))
1579             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1580         else
1581             *cdp_unkn_pdp_cnt = 0;
1582     } else {            /* rra_step_cnt[i]  == 0 */
1584 #ifdef DEBUG
1585         if (isnan(*cdp_val)) {
1586             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1587                     i, ii);
1588         } else {
1589             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1590                     i, ii, *cdp_val);
1591         }
1592 #endif
1593         if (isnan(pdp_temp_val)) {
1594             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1595         } else {
1596             *cdp_val =
1597                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1598                                   current_cf, i, ii);
1599         }
1600     }
1603 /*
1604  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1605  * on the type of consolidation function.
1606  */
1607 static void initialize_cdp_val(
1608     unival *scratch,
1609     int current_cf,
1610     rrd_value_t pdp_temp_val,
1611     unsigned long elapsed_pdp_st,
1612     unsigned long start_pdp_offset,
1613     unsigned long pdp_cnt)
1615     rrd_value_t cum_val, cur_val;
1617     switch (current_cf) {
1618     case CF_AVERAGE:
1619         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1620         cur_val = IFDNAN(pdp_temp_val, 0.0);
1621         scratch[CDP_primary_val].u_val =
1622             (cum_val + cur_val * start_pdp_offset) /
1623             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1624         scratch[CDP_val].u_val =
1625             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1626                                           start_pdp_offset, pdp_cnt);
1627         break;
1628     case CF_MAXIMUM:
1629         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1630         cur_val = IFDNAN(pdp_temp_val, -DINF);
1631 #if 0
1632 #ifdef DEBUG
1633         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1634             fprintf(stderr,
1635                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1636                     i, ii);
1637             exit(-1);
1638         }
1639 #endif
1640 #endif
1641         if (cur_val > cum_val)
1642             scratch[CDP_primary_val].u_val = cur_val;
1643         else
1644             scratch[CDP_primary_val].u_val = cum_val;
1645         /* initialize carry over value */
1646         scratch[CDP_val].u_val = pdp_temp_val;
1647         break;
1648     case CF_MINIMUM:
1649         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1650         cur_val = IFDNAN(pdp_temp_val, DINF);
1651 #if 0
1652 #ifdef DEBUG
1653         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1654             fprintf(stderr,
1655                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1656                     ii);
1657             exit(-1);
1658         }
1659 #endif
1660 #endif
1661         if (cur_val < cum_val)
1662             scratch[CDP_primary_val].u_val = cur_val;
1663         else
1664             scratch[CDP_primary_val].u_val = cum_val;
1665         /* initialize carry over value */
1666         scratch[CDP_val].u_val = pdp_temp_val;
1667         break;
1668     case CF_LAST:
1669     default:
1670         scratch[CDP_primary_val].u_val = pdp_temp_val;
1671         /* initialize carry over value */
1672         scratch[CDP_val].u_val = pdp_temp_val;
1673         break;
1674     }
1677 /*
1678  * Update the consolidation function for Holt-Winters functions as
1679  * well as other functions that don't actually consolidate multiple
1680  * PDPs.
1681  */
1682 static void reset_cdp(
1683     rrd_t *rrd,
1684     unsigned long elapsed_pdp_st,
1685     rrd_value_t *pdp_temp,
1686     rrd_value_t *last_seasonal_coef,
1687     rrd_value_t *seasonal_coef,
1688     int rra_idx,
1689     int ds_idx,
1690     int cdp_idx,
1691     enum cf_en current_cf)
1693     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1695     switch (current_cf) {
1696     case CF_AVERAGE:
1697     default:
1698         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1699         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1700         break;
1701     case CF_SEASONAL:
1702     case CF_DEVSEASONAL:
1703         /* need to update cached seasonal values, so they are consistent
1704          * with the bulk update */
1705         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1706          * CDP_last_deviation are the same. */
1707         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1708         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1709         break;
1710     case CF_HWPREDICT:
1711     case CF_MHWPREDICT:
1712         /* need to update the null_count and last_null_count.
1713          * even do this for non-DNAN pdp_temp because the
1714          * algorithm is not learning from batch updates. */
1715         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1716         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1717         /* fall through */
1718     case CF_DEVPREDICT:
1719         scratch[CDP_primary_val].u_val = DNAN;
1720         scratch[CDP_secondary_val].u_val = DNAN;
1721         break;
1722     case CF_FAILURES:
1723         /* do not count missed bulk values as failures */
1724         scratch[CDP_primary_val].u_val = 0;
1725         scratch[CDP_secondary_val].u_val = 0;
1726         /* need to reset violations buffer.
1727          * could do this more carefully, but for now, just
1728          * assume a bulk update wipes away all violations. */
1729         erase_violations(rrd, cdp_idx, rra_idx);
1730         break;
1731     }
1734 static rrd_value_t initialize_average_carry_over(
1735     rrd_value_t pdp_temp_val,
1736     unsigned long elapsed_pdp_st,
1737     unsigned long start_pdp_offset,
1738     unsigned long pdp_cnt)
1740     /* initialize carry over value */
1741     if (isnan(pdp_temp_val)) {
1742         return DNAN;
1743     }
1744     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1747 /*
1748  * Update or initialize a CDP value based on the consolidation
1749  * function.
1750  *
1751  * Returns the new value.
1752  */
1753 static rrd_value_t calculate_cdp_val(
1754     rrd_value_t cdp_val,
1755     rrd_value_t pdp_temp_val,
1756     unsigned long elapsed_pdp_st,
1757     int current_cf,
1758 #ifdef DEBUG
1759     int i,
1760     int ii
1761 #else
1762     int UNUSED(i),
1763     int UNUSED(ii)
1764 #endif
1765     )
1767     if (isnan(cdp_val)) {
1768         if (current_cf == CF_AVERAGE) {
1769             pdp_temp_val *= elapsed_pdp_st;
1770         }
1771 #ifdef DEBUG
1772         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1773                 i, ii, pdp_temp_val);
1774 #endif
1775         return pdp_temp_val;
1776     }
1777     if (current_cf == CF_AVERAGE)
1778         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1779     if (current_cf == CF_MINIMUM)
1780         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1781     if (current_cf == CF_MAXIMUM)
1782         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1784     return pdp_temp_val;
1787 /*
1788  * For each RRA, update the seasonal values and then call update_aberrant_CF
1789  * for each data source.
1790  *
1791  * Return 0 on success, -1 on error.
1792  */
1793 static int update_aberrant_cdps(
1794     rrd_t *rrd,
1795     rrd_file_t *rrd_file,
1796     unsigned long rra_begin,
1797     unsigned long elapsed_pdp_st,
1798     rrd_value_t *pdp_temp,
1799     rrd_value_t **seasonal_coef)
1801     unsigned long rra_idx, ds_idx, j;
1803     /* number of PDP steps since the last update that
1804      * are assigned to the first CDP to be generated
1805      * since the last update. */
1806     unsigned short scratch_idx;
1807     unsigned long rra_start;
1808     enum cf_en current_cf;
1810     /* this loop is only entered if elapsed_pdp_st < 3 */
1811     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1812          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1813         rra_start = rra_begin;
1814         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1815             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1816                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1817                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1818                     if (scratch_idx == CDP_primary_val) {
1819                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1820                                         elapsed_pdp_st + 1, seasonal_coef);
1821                     } else {
1822                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1823                                         elapsed_pdp_st + 2, seasonal_coef);
1824                     }
1825                 }
1826                 if (rrd_test_error())
1827                     return -1;
1828                 /* loop over data soures within each RRA */
1829                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1830                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1831                                        rra_idx * (rrd->stat_head->ds_cnt) +
1832                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1833                                        *seasonal_coef);
1834                 }
1835             }
1836             rra_start += rrd->rra_def[rra_idx].row_cnt
1837                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1838         }
1839     }
1840     return 0;
1843 /* 
1844  * Move sequentially through the file, writing one RRA at a time.  Note this
1845  * architecture divorces the computation of CDP with flushing updated RRA
1846  * entries to disk.
1847  *
1848  * Return 0 on success, -1 on error.
1849  */
1850 static int write_to_rras(
1851     rrd_t *rrd,
1852     rrd_file_t *rrd_file,
1853     unsigned long *rra_step_cnt,
1854     unsigned long rra_begin,
1855     time_t current_time,
1856     unsigned long *skip_update,
1857     rrd_info_t ** pcdp_summary)
1859     unsigned long rra_idx;
1860     unsigned long rra_start;
1861     time_t    rra_time = 0; /* time of update for a RRA */
1863     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1864     
1865     /* Ready to write to disk */
1866     rra_start = rra_begin;
1868     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1869         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1870         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1872         /* for cdp_prep */
1873         unsigned short scratch_idx;
1874         unsigned long step_subtract;
1876         for (scratch_idx = CDP_primary_val,
1877                  step_subtract = 1;
1878              rra_step_cnt[rra_idx] > 0;
1879              rra_step_cnt[rra_idx]--,
1880                  scratch_idx = CDP_secondary_val,
1881                  step_subtract = 2) {
1883             size_t rra_pos_new;
1884 #ifdef DEBUG
1885             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1886 #endif
1887             /* increment, with wrap-around */
1888             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1889               rra_ptr->cur_row = 0;
1891             /* we know what our position should be */
1892             rra_pos_new = rra_start
1893               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1895             /* re-seek if the position is wrong or we wrapped around */
1896             if (rra_pos_new != rrd_file->pos) {
1897                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1898                     rrd_set_error("seek error in rrd");
1899                     return -1;
1900                 }
1901             }
1902 #ifdef DEBUG
1903             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1904 #endif
1906             if (skip_update[rra_idx])
1907                 continue;
1909             if (*pcdp_summary != NULL) {
1910                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1912                 rra_time = (current_time - current_time % step_time)
1913                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1914             }
1916             if (write_RRA_row
1917                 (rrd_file, rrd, rra_idx, scratch_idx,
1918                  pcdp_summary, rra_time) == -1)
1919                 return -1;
1920         }
1922         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1923     } /* RRA LOOP */
1925     return 0;
1928 /*
1929  * Write out one row of values (one value per DS) to the archive.
1930  *
1931  * Returns 0 on success, -1 on error.
1932  */
1933 static int write_RRA_row(
1934     rrd_file_t *rrd_file,
1935     rrd_t *rrd,
1936     unsigned long rra_idx,
1937     unsigned short CDP_scratch_idx,
1938     rrd_info_t ** pcdp_summary,
1939     time_t rra_time)
1941     unsigned long ds_idx, cdp_idx;
1942     rrd_infoval_t iv;
1944     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1945         /* compute the cdp index */
1946         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1947 #ifdef DEBUG
1948         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1949                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1950                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1951 #endif
1952         if (*pcdp_summary != NULL) {
1953             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1954             /* append info to the return hash */
1955             *pcdp_summary = rrd_info_push(*pcdp_summary,
1956                                           sprintf_alloc
1957                                           ("[%lli]RRA[%s][%lu]DS[%s]", (long long)rra_time,
1958                                            rrd->rra_def[rra_idx].cf_nam,
1959                                            rrd->rra_def[rra_idx].pdp_cnt,
1960                                            rrd->ds_def[ds_idx].ds_nam),
1961                                           RD_I_VAL, iv);
1962         }
1963         errno = 0;
1964         if (rrd_write(rrd_file,
1965                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1966                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1967             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1968             return -1;
1969         }
1970     }
1971     return 0;
1974 /*
1975  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1976  *
1977  * Returns 0 on success, -1 otherwise
1978  */
1979 static int smooth_all_rras(
1980     rrd_t *rrd,
1981     rrd_file_t *rrd_file,
1982     unsigned long rra_begin)
1984     unsigned long rra_start = rra_begin;
1985     unsigned long rra_idx;
1987     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
1988         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
1989             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
1990 #ifdef DEBUG
1991             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
1992 #endif
1993             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
1994             if (rrd_test_error())
1995                 return -1;
1996         }
1997         rra_start += rrd->rra_def[rra_idx].row_cnt
1998             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1999     }
2000     return 0;
2003 #ifndef HAVE_MMAP
2004 /*
2005  * Flush changes to disk (unless we're using mmap)
2006  *
2007  * Returns 0 on success, -1 otherwise
2008  */
2009 static int write_changes_to_disk(
2010     rrd_t *rrd,
2011     rrd_file_t *rrd_file,
2012     int version)
2014     /* we just need to write back the live header portion now */
2015     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2016                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2017                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2018                  SEEK_SET) != 0) {
2019         rrd_set_error("seek rrd for live header writeback");
2020         return -1;
2021     }
2022     if (version >= 3) {
2023         if (rrd_write(rrd_file, rrd->live_head,
2024                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2025             rrd_set_error("rrd_write live_head to rrd");
2026             return -1;
2027         }
2028     } else {
2029         if (rrd_write(rrd_file, rrd->legacy_last_up,
2030                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2031             rrd_set_error("rrd_write live_head to rrd");
2032             return -1;
2033         }
2034     }
2037     if (rrd_write(rrd_file, rrd->pdp_prep,
2038                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2039         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2040         rrd_set_error("rrd_write pdp_prep to rrd");
2041         return -1;
2042     }
2044     if (rrd_write(rrd_file, rrd->cdp_prep,
2045                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2046                   rrd->stat_head->ds_cnt)
2047         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2048                       rrd->stat_head->ds_cnt)) {
2050         rrd_set_error("rrd_write cdp_prep to rrd");
2051         return -1;
2052     }
2054     if (rrd_write(rrd_file, rrd->rra_ptr,
2055                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2056         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2057         rrd_set_error("rrd_write rra_ptr to rrd");
2058         return -1;
2059     }
2060     return 0;
2062 #endif