Code

this should help with the memory leak
[rrdtool-all.git] / program / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.99907080300  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
49     struct _timeb current_time;
51     _ftime(&current_time);
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
56     return 0;
57 }
59 #endif
61 /* FUNCTION PROTOTYPES */
63 int       rrd_update_r(
64     const char *filename,
65     const char *tmplt,
66     int argc,
67     const char **argv);
68 int       _rrd_update(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv,
73     info_t *);
75 static int allocate_data_structures(
76     rrd_t *rrd,
77     char ***updvals,
78     rrd_value_t **pdp_temp,
79     const char *tmplt,
80     long **tmpl_idx,
81     unsigned long *tmpl_cnt,
82     unsigned long **rra_step_cnt,
83     unsigned long **skip_update,
84     rrd_value_t **pdp_new);
86 static int parse_template(
87     rrd_t *rrd,
88     const char *tmplt,
89     unsigned long *tmpl_cnt,
90     long *tmpl_idx);
92 static int process_arg(
93     char *step_start,
94     rrd_t *rrd,
95     rrd_file_t *rrd_file,
96     unsigned long rra_begin,
97     unsigned long *rra_current,
98     time_t *current_time,
99     unsigned long *current_time_usec,
100     rrd_value_t *pdp_temp,
101     rrd_value_t *pdp_new,
102     unsigned long *rra_step_cnt,
103     char **updvals,
104     long *tmpl_idx,
105     unsigned long tmpl_cnt,
106     info_t **pcdp_summary,
107     int version,
108     unsigned long *skip_update,
109     int *schedule_smooth);
111 static int parse_ds(
112     rrd_t *rrd,
113     char **updvals,
114     long *tmpl_idx,
115     char *input,
116     unsigned long tmpl_cnt,
117     time_t *current_time,
118     unsigned long *current_time_usec,
119     int version);
121 static int get_time_from_reading(
122     rrd_t *rrd,
123     char timesyntax,
124     char **updvals,
125     time_t *current_time,
126     unsigned long *current_time_usec,
127     int version);
129 static int update_pdp_prep(
130     rrd_t *rrd,
131     char **updvals,
132     rrd_value_t *pdp_new,
133     double interval);
135 static int calculate_elapsed_steps(
136     rrd_t *rrd,
137     unsigned long current_time,
138     unsigned long current_time_usec,
139     double interval,
140     double *pre_int,
141     double *post_int,
142     unsigned long *proc_pdp_cnt);
144 static void simple_update(
145     rrd_t *rrd,
146     double interval,
147     rrd_value_t *pdp_new);
149 static int process_all_pdp_st(
150     rrd_t *rrd,
151     double interval,
152     double pre_int,
153     double post_int,
154     unsigned long elapsed_pdp_st,
155     rrd_value_t *pdp_new,
156     rrd_value_t *pdp_temp);
158 static int process_pdp_st(
159     rrd_t *rrd,
160     unsigned long ds_idx,
161     double interval,
162     double pre_int,
163     double post_int,
164     long diff_pdp_st,
165     rrd_value_t *pdp_new,
166     rrd_value_t *pdp_temp);
168 static int update_all_cdp_prep(
169     rrd_t *rrd,
170     unsigned long *rra_step_cnt,
171     unsigned long rra_begin,
172     rrd_file_t *rrd_file,
173     unsigned long elapsed_pdp_st,
174     unsigned long proc_pdp_cnt,
175     rrd_value_t **last_seasonal_coef,
176     rrd_value_t **seasonal_coef,
177     rrd_value_t *pdp_temp,
178     unsigned long *rra_current,
179     unsigned long *skip_update,
180     int *schedule_smooth);
182 static int do_schedule_smooth(
183     rrd_t *rrd,
184     unsigned long rra_idx,
185     unsigned long elapsed_pdp_st);
187 static int update_cdp_prep(
188     rrd_t *rrd,
189     unsigned long elapsed_pdp_st,
190     unsigned long start_pdp_offset,
191     unsigned long *rra_step_cnt,
192     int rra_idx,
193     rrd_value_t *pdp_temp,
194     rrd_value_t *last_seasonal_coef,
195     rrd_value_t *seasonal_coef,
196     int current_cf);
198 static void update_cdp(
199     unival *scratch,
200     int current_cf,
201     rrd_value_t pdp_temp_val,
202     unsigned long rra_step_cnt,
203     unsigned long elapsed_pdp_st,
204     unsigned long start_pdp_offset,
205     unsigned long pdp_cnt,
206     rrd_value_t xff,
207     int i,
208     int ii);
210 static void initialize_cdp_val(
211     unival *scratch,
212     int current_cf,
213     rrd_value_t pdp_temp_val,
214     unsigned long elapsed_pdp_st,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
229 static rrd_value_t initialize_average_carry_over(
230     rrd_value_t pdp_temp_val,
231     unsigned long elapsed_pdp_st,
232     unsigned long start_pdp_offset,
233     unsigned long pdp_cnt);
235 static rrd_value_t calculate_cdp_val(
236     rrd_value_t cdp_val,
237     rrd_value_t pdp_temp_val,
238     unsigned long elapsed_pdp_st,
239     int current_cf,
240     int i,
241     int ii);
243 static int update_aberrant_cdps(
244     rrd_t *rrd,
245     rrd_file_t *rrd_file,
246     unsigned long rra_begin,
247     unsigned long *rra_current,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     unsigned long *rra_current,
258     time_t current_time,
259     unsigned long *skip_update,
260     info_t **pcdp_summary);
262 static int write_RRA_row(
263     rrd_file_t *rrd_file,
264     rrd_t *rrd,
265     unsigned long rra_idx,
266     unsigned long *rra_current,
267     unsigned short CDP_scratch_idx,
268     info_t **pcdp_summary,
269     time_t rra_time);
271 static int smooth_all_rras(
272     rrd_t *rrd,
273     rrd_file_t *rrd_file,
274     unsigned long rra_begin);
276 #ifndef HAVE_MMAP
277 static int write_changes_to_disk(
278     rrd_t *rrd,
279     rrd_file_t *rrd_file,
280     int version);
281 #endif
283 /*
284  * normalize time as returned by gettimeofday. usec part must
285  * be always >= 0
286  */
287 static inline void normalize_time(
288     struct timeval *t)
290     if (t->tv_usec < 0) {
291         t->tv_sec--;
292         t->tv_usec += 1e6L;
293     }
296 /*
297  * Sets current_time and current_time_usec based on the current time.
298  * current_time_usec is set to 0 if the version number is 1 or 2.
299  */
300 static inline void initialize_time(
301     time_t *current_time,
302     unsigned long *current_time_usec,
303     int version)
305     struct timeval tmp_time;    /* used for time conversion */
307     gettimeofday(&tmp_time, 0);
308     normalize_time(&tmp_time);
309     *current_time = tmp_time.tv_sec;
310     if (version >= 3) {
311         *current_time_usec = tmp_time.tv_usec;
312     } else {
313         *current_time_usec = 0;
314     }
317 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319 info_t   *rrd_update_v(
320     int argc,
321     char **argv)
323     char     *tmplt = NULL;
324     info_t   *result = NULL;
325     infoval   rc;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
335     while (1) {
336         int       option_index = 0;
337         int       opt;
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341         if (opt == EOF)
342             break;
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
355     /* need at least 2 arguments: filename, data. */
356     if (argc - optind < 2) {
357         rrd_set_error("Not enough arguments");
358         goto end_tag;
359     }
360     rc.u_int = 0;
361     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
362     rc.u_int = _rrd_update(argv[optind], tmplt,
363                            argc - optind - 1,
364                            (const char **) (argv + optind + 1), result);
365     result->value.u_int = rc.u_int;
366   end_tag:
367     return result;
370 int rrd_update(
371     int argc,
372     char **argv)
374     struct option long_options[] = {
375         {"template", required_argument, 0, 't'},
376         {0, 0, 0, 0}
377     };
378     int       option_index = 0;
379     int       opt;
380     char     *tmplt = NULL;
381     int       rc = -1;
383     optind = 0;
384     opterr = 0;         /* initialize getopt */
386     while (1) {
387         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389         if (opt == EOF)
390             break;
392         switch (opt) {
393         case 't':
394             tmplt = strdup(optarg);
395             break;
397         case '?':
398             rrd_set_error("unknown option '%s'", argv[optind - 1]);
399             goto out;
400         }
401     }
403     /* need at least 2 arguments: filename, data. */
404     if (argc - optind < 2) {
405         rrd_set_error("Not enough arguments");
406         goto out;
407     }
409     rc = rrd_update_r(argv[optind], tmplt,
410                       argc - optind - 1, (const char **) (argv + optind + 1));
411   out:
412     free(tmplt);
413     return rc;
416 int rrd_update_r(
417     const char *filename,
418     const char *tmplt,
419     int argc,
420     const char **argv)
422     return _rrd_update(filename, tmplt, argc, argv, NULL);
425 int _rrd_update(
426     const char *filename,
427     const char *tmplt,
428     int argc,
429     const char **argv,
430     info_t *pcdp_summary)
433     int       arg_i = 2;
435     unsigned long rra_begin;    /* byte pointer to the rra
436                                  * area in the rrd file.  this
437                                  * pointer never changes value */
438     unsigned long rra_current;  /* byte pointer to the current write
439                                  * spot in the rrd file. */
440     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
441                              * to the existing entry */
442     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
443                              * to the cdp values */
445     long     *tmpl_idx; /* index representing the settings
446                          * transported by the tmplt index */
447     unsigned long tmpl_cnt = 2; /* time and data */
448     rrd_t     rrd;
449     time_t    current_time = 0;
450     unsigned long current_time_usec = 0;    /* microseconds part of current time */
451     char    **updvals;
452     int       schedule_smooth = 0;
454     /* number of elapsed PDP steps since last update */
455     unsigned long *rra_step_cnt = NULL;
457     int       version;  /* rrd version */
458     rrd_file_t *rrd_file;
459     char     *arg_copy; /* for processing the argv */
460     unsigned long *skip_update; /* RRAs to advance but not write */
462     /* need at least 1 arguments: data. */
463     if (argc < 1) {
464         rrd_set_error("Not enough arguments");
465         goto err_out;
466     }
468     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
469         goto err_free;
470     }
471     /* We are now at the beginning of the rra's */
472     rra_current = rra_begin = rrd_file->header_len;
474     version = atoi(rrd.stat_head->version);
476     initialize_time(&current_time, &current_time_usec, version);
478     /* get exclusive lock to whole file.
479      * lock gets removed when we close the file.
480      */
481     if (LockRRD(rrd_file->fd) != 0) {
482         rrd_set_error("could not lock RRD");
483         goto err_close;
484     }
486     if (allocate_data_structures(&rrd, &updvals,
487                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
488                                  &rra_step_cnt, &skip_update,
489                                  &pdp_new) == -1) {
490         goto err_close;
491     }
493     /* loop through the arguments. */
494     for (arg_i = 0; arg_i < argc; arg_i++) {
495         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
496             rrd_set_error("failed duplication argv entry");
497             break;
498         }
499         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin, &rra_current,
500                         &current_time, &current_time_usec, pdp_temp, pdp_new,
501                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
502                         &pcdp_summary, version, skip_update,
503                         &schedule_smooth) == -1) {
504             free(arg_copy);
505             break;
506         }
507         free(arg_copy);
508     }
510     free(rra_step_cnt);
512     /* if we got here and if there is an error and if the file has not been
513      * written to, then close things up and return. */
514     if (rrd_test_error()) {
515         goto err_free_structures;
516     }
517 #ifndef HAVE_MMAP
518     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
519         goto err_free_structures;
520     }
521 #endif
523     /* calling the smoothing code here guarantees at most one smoothing
524      * operation per rrd_update call. Unfortunately, it is possible with bulk
525      * updates, or a long-delayed update for smoothing to occur off-schedule.
526      * This really isn't critical except during the burn-in cycles. */
527     if (schedule_smooth) {
528         smooth_all_rras(&rrd, rrd_file, rra_begin);
529     }
531 /*    rrd_dontneed(rrd_file,&rrd); */
532     rrd_free(&rrd);
533     rrd_close(rrd_file);
535     free(pdp_new);
536     free(tmpl_idx);
537     free(pdp_temp);
538     free(skip_update);
539     free(updvals);
540     return 0;
542   err_free_structures:
543     free(pdp_new);
544     free(tmpl_idx);
545     free(pdp_temp);
546     free(skip_update);
547     free(updvals);
548   err_close:
549     rrd_close(rrd_file);
550   err_free:
551     rrd_free(&rrd);
552   err_out:
553     return -1;
556 /*
557  * get exclusive lock to whole file.
558  * lock gets removed when we close the file
559  *
560  * returns 0 on success
561  */
562 int LockRRD(
563     int in_file)
565     int       rcstat;
567     {
568 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
569         struct _stat st;
571         if (_fstat(in_file, &st) == 0) {
572             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
573         } else {
574             rcstat = -1;
575         }
576 #else
577         struct flock lock;
579         lock.l_type = F_WRLCK;  /* exclusive write lock */
580         lock.l_len = 0; /* whole file */
581         lock.l_start = 0;   /* start of file */
582         lock.l_whence = SEEK_SET;   /* end of file */
584         rcstat = fcntl(in_file, F_SETLK, &lock);
585 #endif
586     }
588     return (rcstat);
591 /*
592  * Allocate some important arrays used, and initialize the template.
593  *
594  * When it returns, either all of the structures are allocated
595  * or none of them are.
596  *
597  * Returns 0 on success, -1 on error.
598  */
599 static int allocate_data_structures(
600     rrd_t *rrd,
601     char ***updvals,
602     rrd_value_t **pdp_temp,
603     const char *tmplt,
604     long **tmpl_idx,
605     unsigned long *tmpl_cnt,
606     unsigned long **rra_step_cnt,
607     unsigned long **skip_update,
608     rrd_value_t **pdp_new)
610     unsigned  i, ii;
611     if ((*updvals = (char **) malloc(sizeof(char *)
612                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
613         rrd_set_error("allocating updvals pointer array.");
614         return -1;
615     }
616     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
617                                             * rrd->stat_head->ds_cnt)) ==
618         NULL) {
619         rrd_set_error("allocating pdp_temp.");
620         goto err_free_updvals;
621     }
622     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
623                                                  *
624                                                  rrd->stat_head->rra_cnt)) ==
625         NULL) {
626         rrd_set_error("allocating skip_update.");
627         goto err_free_pdp_temp;
628     }
629     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
630                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
631         rrd_set_error("allocating tmpl_idx.");
632         goto err_free_skip_update;
633     }
634     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
635                                                   *
636                                                   (rrd->stat_head->
637                                                    rra_cnt))) == NULL) {
638         rrd_set_error("allocating rra_step_cnt.");
639         goto err_free_tmpl_idx;
640     }
642     /* initialize tmplt redirector */
643     /* default config example (assume DS 1 is a CDEF DS)
644        tmpl_idx[0] -> 0; (time)
645        tmpl_idx[1] -> 1; (DS 0)
646        tmpl_idx[2] -> 3; (DS 2)
647        tmpl_idx[3] -> 4; (DS 3) */
648     (*tmpl_idx)[0] = 0; /* time */
649     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
650         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
651             (*tmpl_idx)[ii++] = i;
652     }
653     *tmpl_cnt = ii;
655     if (tmplt != NULL) {
656         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
657             goto err_free_rra_step_cnt;
658         }
659     }
661     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
662                                            * rrd->stat_head->ds_cnt)) == NULL) {
663         rrd_set_error("allocating pdp_new.");
664         goto err_free_rra_step_cnt;
665     }
667     return 0;
669   err_free_rra_step_cnt:
670     free(*rra_step_cnt);
671   err_free_tmpl_idx:
672     free(*tmpl_idx);
673   err_free_skip_update:
674     free(*skip_update);
675   err_free_pdp_temp:
676     free(*pdp_temp);
677   err_free_updvals:
678     free(*updvals);
679     return -1;
682 /*
683  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
684  *
685  * Returns 0 on success.
686  */
687 static int parse_template(
688     rrd_t *rrd,
689     const char *tmplt,
690     unsigned long *tmpl_cnt,
691     long *tmpl_idx)
693     char     *dsname, *tmplt_copy;
694     unsigned int tmpl_len, i;
695     int       ret = 0;
697     *tmpl_cnt = 1;      /* the first entry is the time */
699     /* we should work on a writeable copy here */
700     if ((tmplt_copy = strdup(tmplt)) == NULL) {
701         rrd_set_error("error copying tmplt '%s'", tmplt);
702         ret = -1;
703         goto out;
704     }
706     dsname = tmplt_copy;
707     tmpl_len = strlen(tmplt_copy);
708     for (i = 0; i <= tmpl_len; i++) {
709         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
710             tmplt_copy[i] = '\0';
711             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
712                 rrd_set_error("tmplt contains more DS definitions than RRD");
713                 ret = -1;
714                 goto out_free_tmpl_copy;
715             }
716             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
717                 rrd_set_error("unknown DS name '%s'", dsname);
718                 ret = -1;
719                 goto out_free_tmpl_copy;
720             }
721             /* go to the next entry on the tmplt_copy */
722             if (i < tmpl_len)
723                 dsname = &tmplt_copy[i + 1];
724         }
725     }
726   out_free_tmpl_copy:
727     free(tmplt_copy);
728   out:
729     return ret;
732 /*
733  * Parse an update string, updates the primary data points (PDPs)
734  * and consolidated data points (CDPs), and writes changes to the RRAs.
735  *
736  * Returns 0 on success, -1 on error.
737  */
738 static int process_arg(
739     char *step_start,
740     rrd_t *rrd,
741     rrd_file_t *rrd_file,
742     unsigned long rra_begin,
743     unsigned long *rra_current,
744     time_t *current_time,
745     unsigned long *current_time_usec,
746     rrd_value_t *pdp_temp,
747     rrd_value_t *pdp_new,
748     unsigned long *rra_step_cnt,
749     char **updvals,
750     long *tmpl_idx,
751     unsigned long tmpl_cnt,
752     info_t **pcdp_summary,
753     int version,
754     unsigned long *skip_update,
755     int *schedule_smooth)
757     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
759     /* a vector of future Holt-Winters seasonal coefs */
760     unsigned long elapsed_pdp_st;
762     double    interval, pre_int, post_int;  /* interval between this and
763                                              * the last run */
764     unsigned long proc_pdp_cnt;
765     unsigned long rra_start;
767     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
768                  current_time, current_time_usec, version) == -1) {
769         return -1;
770     }
771     /* seek to the beginning of the rra's */
772     if (*rra_current != rra_begin) {
773 #ifndef HAVE_MMAP
774         if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
775             rrd_set_error("seek error in rrd");
776             return -1;
777         }
778 #endif
779         *rra_current = rra_begin;
780     }
781     rra_start = rra_begin;
783     interval = (double) (*current_time - rrd->live_head->last_up)
784         + (double) ((long) *current_time_usec -
785                     (long) rrd->live_head->last_up_usec) / 1e6f;
787     /* process the data sources and update the pdp_prep 
788      * area accordingly */
789     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
790         return -1;
791     }
793     elapsed_pdp_st = calculate_elapsed_steps(rrd,
794                                              *current_time,
795                                              *current_time_usec, interval,
796                                              &pre_int, &post_int,
797                                              &proc_pdp_cnt);
799     /* has a pdp_st moment occurred since the last run ? */
800     if (elapsed_pdp_st == 0) {
801         /* no we have not passed a pdp_st moment. therefore update is simple */
802         simple_update(rrd, interval, pdp_new);
803     } else {
804         /* an pdp_st has occurred. */
805         if (process_all_pdp_st(rrd, interval,
806                                pre_int, post_int,
807                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
808             return -1;
809         }
810         if (update_all_cdp_prep(rrd, rra_step_cnt,
811                                 rra_begin, rrd_file,
812                                 elapsed_pdp_st,
813                                 proc_pdp_cnt,
814                                 &last_seasonal_coef,
815                                 &seasonal_coef,
816                                 pdp_temp, rra_current,
817                                 skip_update, schedule_smooth) == -1) {
818             goto err_free_coefficients;
819         }
820         if (update_aberrant_cdps(rrd, rrd_file, rra_begin, rra_current,
821                                  elapsed_pdp_st, pdp_temp,
822                                  &seasonal_coef) == -1) {
823             goto err_free_coefficients;
824         }
825         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
826                           rra_current, *current_time, skip_update,
827                           pcdp_summary) == -1) {
828             goto err_free_coefficients;
829         }
830     }                   /* endif a pdp_st has occurred */
831     rrd->live_head->last_up = *current_time;
832     rrd->live_head->last_up_usec = *current_time_usec;
834     free(seasonal_coef);
835     free(last_seasonal_coef);
836     return 0;
838   err_free_coefficients:
839     free(seasonal_coef);
840     free(last_seasonal_coef);
841     return -1;
844 /*
845  * Parse a DS string (time + colon-separated values), storing the
846  * results in current_time, current_time_usec, and updvals.
847  *
848  * Returns 0 on success, -1 on error.
849  */
850 static int parse_ds(
851     rrd_t *rrd,
852     char **updvals,
853     long *tmpl_idx,
854     char *input,
855     unsigned long tmpl_cnt,
856     time_t *current_time,
857     unsigned long *current_time_usec,
858     int version)
860     char     *p;
861     unsigned long i;
862     char      timesyntax;
864     updvals[0] = input;
865     /* initialize all ds input to unknown except the first one
866        which has always got to be set */
867     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
868         updvals[i] = "U";
870     /* separate all ds elements; first must be examined separately
871        due to alternate time syntax */
872     if ((p = strchr(input, '@')) != NULL) {
873         timesyntax = '@';
874     } else if ((p = strchr(input, ':')) != NULL) {
875         timesyntax = ':';
876     } else {
877         rrd_set_error("expected timestamp not found in data source from %s",
878                       input);
879         return -1;
880     }
881     *p = '\0';
882     i = 1;
883     updvals[tmpl_idx[i++]] = p + 1;
884     while (*(++p)) {
885         if (*p == ':') {
886             *p = '\0';
887             if (i < tmpl_cnt) {
888                 updvals[tmpl_idx[i++]] = p + 1;
889             }
890         }
891     }
893     if (i != tmpl_cnt) {
894         rrd_set_error("expected %lu data source readings (got %lu) from %s",
895                       tmpl_cnt - 1, i, input);
896         return -1;
897     }
899     if (get_time_from_reading(rrd, timesyntax, updvals,
900                               current_time, current_time_usec,
901                               version) == -1) {
902         return -1;
903     }
904     return 0;
907 /*
908  * Parse the time in a DS string, store it in current_time and 
909  * current_time_usec and verify that it's later than the last
910  * update for this DS.
911  *
912  * Returns 0 on success, -1 on error.
913  */
914 static int get_time_from_reading(
915     rrd_t *rrd,
916     char timesyntax,
917     char **updvals,
918     time_t *current_time,
919     unsigned long *current_time_usec,
920     int version)
922     double    tmp;
923     char     *parsetime_error = NULL;
924     char     *old_locale;
925     struct rrd_time_value ds_tv;
926     struct timeval tmp_time;    /* used for time conversion */
928     /* get the time from the reading ... handle N */
929     if (timesyntax == '@') {    /* at-style */
930         if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
931             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
932             return -1;
933         }
934         if (ds_tv.type == RELATIVE_TO_END_TIME ||
935             ds_tv.type == RELATIVE_TO_START_TIME) {
936             rrd_set_error("specifying time relative to the 'start' "
937                           "or 'end' makes no sense here: %s", updvals[0]);
938             return -1;
939         }
940         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
941         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
942     } else if (strcmp(updvals[0], "N") == 0) {
943         gettimeofday(&tmp_time, 0);
944         normalize_time(&tmp_time);
945         *current_time = tmp_time.tv_sec;
946         *current_time_usec = tmp_time.tv_usec;
947     } else {
948         old_locale = setlocale(LC_NUMERIC, "C");
949         tmp = strtod(updvals[0], 0);
950         setlocale(LC_NUMERIC, old_locale);
951         *current_time = floor(tmp);
952         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
953     }
954     /* dont do any correction for old version RRDs */
955     if (version < 3)
956         *current_time_usec = 0;
958     if (*current_time < rrd->live_head->last_up ||
959         (*current_time == rrd->live_head->last_up &&
960          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
961         rrd_set_error("illegal attempt to update using time %ld when "
962                       "last update time is %ld (minimum one second step)",
963                       *current_time, rrd->live_head->last_up);
964         return -1;
965     }
966     return 0;
969 /*
970  * Update pdp_new by interpreting the updvals according to the DS type
971  * (COUNTER, GAUGE, etc.).
972  *
973  * Returns 0 on success, -1 on error.
974  */
975 static int update_pdp_prep(
976     rrd_t *rrd,
977     char **updvals,
978     rrd_value_t *pdp_new,
979     double interval)
981     unsigned long ds_idx;
982     int       ii;
983     char     *endptr;   /* used in the conversion */
984     double    rate;
985     char     *old_locale;
986     enum dst_en dst_idx;
988     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
989         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
991         /* make sure we do not build diffs with old last_ds values */
992         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
993             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
994             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
995         }
997         /* NOTE: DST_CDEF should never enter this if block, because
998          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
999          * accidently specified a value for the DST_CDEF. To handle this case,
1000          * an extra check is required. */
1002         if ((updvals[ds_idx + 1][0] != 'U') &&
1003             (dst_idx != DST_CDEF) &&
1004             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1005             rate = DNAN;
1007             /* pdp_new contains rate * time ... eg the bytes transferred during
1008              * the interval. Doing it this way saves a lot of math operations
1009              */
1010             switch (dst_idx) {
1011             case DST_COUNTER:
1012             case DST_DERIVE:
1013                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1014                     if ((updvals[ds_idx + 1][ii] < '0'
1015                          || updvals[ds_idx + 1][ii] > '9')
1016                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1017                         rrd_set_error("not a simple integer: '%s'",
1018                                       updvals[ds_idx + 1]);
1019                         return -1;
1020                     }
1021                 }
1022                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1023                     pdp_new[ds_idx] =
1024                         rrd_diff(updvals[ds_idx + 1],
1025                                  rrd->pdp_prep[ds_idx].last_ds);
1026                     if (dst_idx == DST_COUNTER) {
1027                         /* simple overflow catcher. This will fail
1028                          * terribly for non 32 or 64 bit counters
1029                          * ... are there any others in SNMP land?
1030                          */
1031                         if (pdp_new[ds_idx] < (double) 0.0)
1032                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1033                         if (pdp_new[ds_idx] < (double) 0.0)
1034                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1035                     }
1036                     rate = pdp_new[ds_idx] / interval;
1037                 } else {
1038                     pdp_new[ds_idx] = DNAN;
1039                 }
1040                 break;
1041             case DST_ABSOLUTE:
1042                 old_locale = setlocale(LC_NUMERIC, "C");
1043                 errno = 0;
1044                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1045                 setlocale(LC_NUMERIC, old_locale);
1046                 if (errno > 0) {
1047                     rrd_set_error("converting '%s' to float: %s",
1048                                   updvals[ds_idx + 1], rrd_strerror(errno));
1049                     return -1;
1050                 };
1051                 if (endptr[0] != '\0') {
1052                     rrd_set_error
1053                         ("conversion of '%s' to float not complete: tail '%s'",
1054                          updvals[ds_idx + 1], endptr);
1055                     return -1;
1056                 }
1057                 rate = pdp_new[ds_idx] / interval;
1058                 break;
1059             case DST_GAUGE:
1060                 errno = 0;
1061                 old_locale = setlocale(LC_NUMERIC, "C");
1062                 pdp_new[ds_idx] =
1063                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1064                 setlocale(LC_NUMERIC, old_locale);
1065                 if (errno) {
1066                     rrd_set_error("converting '%s' to float: %s",
1067                                   updvals[ds_idx + 1], rrd_strerror(errno));
1068                     return -1;
1069                 };
1070                 if (endptr[0] != '\0') {
1071                     rrd_set_error
1072                         ("conversion of '%s' to float not complete: tail '%s'",
1073                          updvals[ds_idx + 1], endptr);
1074                     return -1;
1075                 }
1076                 rate = pdp_new[ds_idx] / interval;
1077                 break;
1078             default:
1079                 rrd_set_error("rrd contains unknown DS type : '%s'",
1080                               rrd->ds_def[ds_idx].dst);
1081                 return -1;
1082             }
1083             /* break out of this for loop if the error string is set */
1084             if (rrd_test_error()) {
1085                 return -1;
1086             }
1087             /* make sure pdp_temp is neither too large or too small
1088              * if any of these occur it becomes unknown ...
1089              * sorry folks ... */
1090             if (!isnan(rate) &&
1091                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1092                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1093                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1094                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1095                 pdp_new[ds_idx] = DNAN;
1096             }
1097         } else {
1098             /* no news is news all the same */
1099             pdp_new[ds_idx] = DNAN;
1100         }
1103         /* make a copy of the command line argument for the next run */
1104 #ifdef DEBUG
1105         fprintf(stderr, "prep ds[%lu]\t"
1106                 "last_arg '%s'\t"
1107                 "this_arg '%s'\t"
1108                 "pdp_new %10.2f\n",
1109                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1110                 pdp_new[ds_idx]);
1111 #endif
1112         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1113                 LAST_DS_LEN - 1);
1114         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1115     }
1116     return 0;
1119 /*
1120  * How many PDP steps have elapsed since the last update? Returns the answer,
1121  * and stores the time between the last update and the last PDP in pre_time,
1122  * and the time between the last PDP and the current time in post_int.
1123  */
1124 static int calculate_elapsed_steps(
1125     rrd_t *rrd,
1126     unsigned long current_time,
1127     unsigned long current_time_usec,
1128     double interval,
1129     double *pre_int,
1130     double *post_int,
1131     unsigned long *proc_pdp_cnt)
1133     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1134     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1135                                  * time */
1136     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1137                                  * when it was last updated */
1138     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1140     /* when was the current pdp started */
1141     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1142     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1144     /* when did the last pdp_st occur */
1145     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1146     occu_pdp_st = current_time - occu_pdp_age;
1148     if (occu_pdp_st > proc_pdp_st) {
1149         /* OK we passed the pdp_st moment */
1150         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1151                                                                      * occurred before the latest
1152                                                                      * pdp_st moment*/
1153         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1154         *post_int = occu_pdp_age;   /* how much after it */
1155         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1156     } else {
1157         *pre_int = interval;
1158         *post_int = 0;
1159     }
1161     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1163 #ifdef DEBUG
1164     printf("proc_pdp_age %lu\t"
1165            "proc_pdp_st %lu\t"
1166            "occu_pfp_age %lu\t"
1167            "occu_pdp_st %lu\t"
1168            "int %lf\t"
1169            "pre_int %lf\t"
1170            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1171            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1172 #endif
1174     /* compute the number of elapsed pdp_st moments */
1175     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1178 /*
1179  * Increment the PDP values by the values in pdp_new, or else initialize them.
1180  */
1181 static void simple_update(
1182     rrd_t *rrd,
1183     double interval,
1184     rrd_value_t *pdp_new)
1186     int       i;
1188     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1189         if (isnan(pdp_new[i])) {
1190             /* this is not really accurate if we use subsecond data arrival time
1191                should have thought of it when going subsecond resolution ...
1192                sorry next format change we will have it! */
1193             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1194                 floor(interval);
1195         } else {
1196             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1197                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1198             } else {
1199                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1200             }
1201         }
1202 #ifdef DEBUG
1203         fprintf(stderr,
1204                 "NO PDP  ds[%i]\t"
1205                 "value %10.2f\t"
1206                 "unkn_sec %5lu\n",
1207                 i,
1208                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1209                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1210 #endif
1211     }
1214 /*
1215  * Call process_pdp_st for each DS.
1216  *
1217  * Returns 0 on success, -1 on error.
1218  */
1219 static int process_all_pdp_st(
1220     rrd_t *rrd,
1221     double interval,
1222     double pre_int,
1223     double post_int,
1224     unsigned long elapsed_pdp_st,
1225     rrd_value_t *pdp_new,
1226     rrd_value_t *pdp_temp)
1228     unsigned long ds_idx;
1230     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1231        rate*seconds which occurred up to the last run.
1232        pdp_new[] contains rate*seconds from the latest run.
1233        pdp_temp[] will contain the rate for cdp */
1235     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1236         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1237                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1238                            pdp_new, pdp_temp) == -1) {
1239             return -1;
1240         }
1241 #ifdef DEBUG
1242         fprintf(stderr, "PDP UPD ds[%lu]\t"
1243                 "pdp_temp %10.2f\t"
1244                 "new_prep %10.2f\t"
1245                 "new_unkn_sec %5lu\n",
1246                 ds_idx, pdp_temp[ds_idx],
1247                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1248                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1249 #endif
1250     }
1251     return 0;
1254 /*
1255  * Process an update that occurs after one of the PDP moments.
1256  * Increments the PDP value, sets NAN if time greater than the
1257  * heartbeats have elapsed, processes CDEFs.
1258  *
1259  * Returns 0 on success, -1 on error.
1260  */
1261 static int process_pdp_st(
1262     rrd_t *rrd,
1263     unsigned long ds_idx,
1264     double interval,
1265     double pre_int,
1266     double post_int,
1267     long diff_pdp_st,
1268     rrd_value_t *pdp_new,
1269     rrd_value_t *pdp_temp)
1271     int       i;
1273     /* update pdp_prep to the current pdp_st. */
1274     double    pre_unknown = 0.0;
1275     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1276     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1278     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1280     rpnstack_init(&rpnstack);
1283     if (isnan(pdp_new[ds_idx])) {
1284         /* a final bit of unknown to be added bevore calculation
1285            we use a temporary variable for this so that we
1286            don't have to turn integer lines before using the value */
1287         pre_unknown = pre_int;
1288     } else {
1289         if (isnan(scratch[PDP_val].u_val)) {
1290             scratch[PDP_val].u_val = 0;
1291         }
1292         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1293     }
1295     /* if too much of the pdp_prep is unknown we dump it */
1296     /* if the interval is larger thatn mrhb we get NAN */
1297     if ((interval > mrhb) ||
1298         (diff_pdp_st <= (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1299         pdp_temp[ds_idx] = DNAN;
1300     } else {
1301         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1302             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1303              pre_unknown);
1304     }
1306     /* process CDEF data sources; remember each CDEF DS can
1307      * only reference other DS with a lower index number */
1308     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1309         rpnp_t   *rpnp;
1311         rpnp =
1312             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1313         /* substitute data values for OP_VARIABLE nodes */
1314         for (i = 0; rpnp[i].op != OP_END; i++) {
1315             if (rpnp[i].op == OP_VARIABLE) {
1316                 rpnp[i].op = OP_NUMBER;
1317                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1318             }
1319         }
1320         /* run the rpn calculator */
1321         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1322             free(rpnp);
1323             rpnstack_free(&rpnstack);
1324             return -1;
1325         }
1326     }
1328     /* make pdp_prep ready for the next run */
1329     if (isnan(pdp_new[ds_idx])) {
1330         /* this is not realy accurate if we use subsecond data arival time
1331            should have thought of it when going subsecond resolution ...
1332            sorry next format change we will have it! */
1333         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1334         scratch[PDP_val].u_val = DNAN;
1335     } else {
1336         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1337         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1338     }
1339     rpnstack_free(&rpnstack);
1340     return 0;
1343 /*
1344  * Iterate over all the RRAs for a given DS and:
1345  * 1. Decide whether to schedule a smooth later
1346  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1347  * 3. Update the CDP
1348  *
1349  * Returns 0 on success, -1 on error
1350  */
1351 static int update_all_cdp_prep(
1352     rrd_t *rrd,
1353     unsigned long *rra_step_cnt,
1354     unsigned long rra_begin,
1355     rrd_file_t *rrd_file,
1356     unsigned long elapsed_pdp_st,
1357     unsigned long proc_pdp_cnt,
1358     rrd_value_t **last_seasonal_coef,
1359     rrd_value_t **seasonal_coef,
1360     rrd_value_t *pdp_temp,
1361     unsigned long *rra_current,
1362     unsigned long *skip_update,
1363     int *schedule_smooth)
1365     unsigned long rra_idx;
1367     /* index into the CDP scratch array */
1368     enum cf_en current_cf;
1369     unsigned long rra_start;
1371     /* number of rows to be updated in an RRA for a data value. */
1372     unsigned long start_pdp_offset;
1374     rra_start = rra_begin;
1375     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1376         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1377         start_pdp_offset =
1378             rrd->rra_def[rra_idx].pdp_cnt -
1379             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1380         skip_update[rra_idx] = 0;
1381         if (start_pdp_offset <= elapsed_pdp_st) {
1382             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1383                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1384         } else {
1385             rra_step_cnt[rra_idx] = 0;
1386         }
1388         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1389             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1390              * so that they will be correct for the next observed value; note that for
1391              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1392              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1393             if (rra_step_cnt[rra_idx] > 1) {
1394                 skip_update[rra_idx] = 1;
1395                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1396                                 elapsed_pdp_st, last_seasonal_coef);
1397                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1398                                 elapsed_pdp_st + 1, seasonal_coef);
1399             }
1400             /* periodically run a smoother for seasonal effects */
1401             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1402 #ifdef DEBUG
1403                 fprintf(stderr,
1404                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1405                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1406                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1407                         u_cnt);
1408 #endif
1409                 *schedule_smooth = 1;
1410             }
1411             *rra_current = rrd_tell(rrd_file);
1412         }
1413         if (rrd_test_error())
1414             return -1;
1416         if (update_cdp_prep
1417             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1418              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1419              current_cf) == -1) {
1420             return -1;
1421         }
1422         rra_start +=
1423             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1424             sizeof(rrd_value_t);
1425     }
1426     return 0;
1429 /* 
1430  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1431  */
1432 static int do_schedule_smooth(
1433     rrd_t *rrd,
1434     unsigned long rra_idx,
1435     unsigned long elapsed_pdp_st)
1437     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1438     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1439     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1440     unsigned long seasonal_smooth_idx =
1441         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1442     unsigned long *init_seasonal =
1443         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1445     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1446      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1447      * really an RRA level, not a data source within RRA level parameter, but
1448      * the rra_def is read only for rrd_update (not flushed to disk). */
1449     if (*init_seasonal > BURNIN_CYCLES) {
1450         /* someone has no doubt invented a trick to deal with this wrap around,
1451          * but at least this code is clear. */
1452         if (seasonal_smooth_idx > cur_row) {
1453             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1454              * between PDP and CDP */
1455             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1456         }
1457         /* can't rely on negative numbers because we are working with
1458          * unsigned values */
1459         return (cur_row + elapsed_pdp_st >= row_cnt
1460                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1461     }
1462     /* mark off one of the burn-in cycles */
1463     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1466 /*
1467  * For a given RRA, iterate over the data sources and call the appropriate
1468  * consolidation function.
1469  *
1470  * Returns 0 on success, -1 on error.
1471  */
1472 static int update_cdp_prep(
1473     rrd_t *rrd,
1474     unsigned long elapsed_pdp_st,
1475     unsigned long start_pdp_offset,
1476     unsigned long *rra_step_cnt,
1477     int rra_idx,
1478     rrd_value_t *pdp_temp,
1479     rrd_value_t *last_seasonal_coef,
1480     rrd_value_t *seasonal_coef,
1481     int current_cf)
1483     unsigned long ds_idx, cdp_idx;
1485     /* update CDP_PREP areas */
1486     /* loop over data soures within each RRA */
1487     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1489         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1491         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1492             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1493                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1494                        elapsed_pdp_st, start_pdp_offset,
1495                        rrd->rra_def[rra_idx].pdp_cnt,
1496                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1497                        rra_idx, ds_idx);
1498         } else {
1499             /* Nothing to consolidate if there's one PDP per CDP. However, if
1500              * we've missed some PDPs, let's update null counters etc. */
1501             if (elapsed_pdp_st > 2) {
1502                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1503                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1504                           current_cf);
1505             }
1506         }
1508         if (rrd_test_error())
1509             return -1;
1510     }                   /* endif data sources loop */
1511     return 0;
1514 /*
1515  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1516  * primary value, secondary value, and # of unknowns.
1517  */
1518 static void update_cdp(
1519     unival *scratch,
1520     int current_cf,
1521     rrd_value_t pdp_temp_val,
1522     unsigned long rra_step_cnt,
1523     unsigned long elapsed_pdp_st,
1524     unsigned long start_pdp_offset,
1525     unsigned long pdp_cnt,
1526     rrd_value_t xff,
1527     int i,
1528     int ii)
1530     /* shorthand variables */
1531     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1532     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1533     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1534     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1536     if (rra_step_cnt) {
1537         /* If we are in this block, as least 1 CDP value will be written to
1538          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1539          * to be written, then the "fill in" value is the CDP_secondary_val
1540          * entry. */
1541         if (isnan(pdp_temp_val)) {
1542             *cdp_unkn_pdp_cnt += start_pdp_offset;
1543             *cdp_secondary_val = DNAN;
1544         } else {
1545             /* CDP_secondary value is the RRA "fill in" value for intermediary
1546              * CDP data entries. No matter the CF, the value is the same because
1547              * the average, max, min, and last of a list of identical values is
1548              * the same, namely, the value itself. */
1549             *cdp_secondary_val = pdp_temp_val;
1550         }
1552         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1553             *cdp_primary_val = DNAN;
1554             if (current_cf == CF_AVERAGE) {
1555                 *cdp_val =
1556                     initialize_average_carry_over(pdp_temp_val,
1557                                                   elapsed_pdp_st,
1558                                                   start_pdp_offset, pdp_cnt);
1559             } else {
1560                 *cdp_val = pdp_temp_val;
1561             }
1562         } else {
1563             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1564                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1565         }               /* endif meets xff value requirement for a valid value */
1566         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1567          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1568         if (isnan(pdp_temp_val))
1569             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1570         else
1571             *cdp_unkn_pdp_cnt = 0;
1572     } else {            /* rra_step_cnt[i]  == 0 */
1574 #ifdef DEBUG
1575         if (isnan(*cdp_val)) {
1576             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1577                     i, ii);
1578         } else {
1579             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1580                     i, ii, *cdp_val);
1581         }
1582 #endif
1583         if (isnan(pdp_temp_val)) {
1584             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1585         } else {
1586             *cdp_val =
1587                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1588                                   current_cf, i, ii);
1589         }
1590     }
1593 /*
1594  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1595  * on the type of consolidation function.
1596  */
1597 static void initialize_cdp_val(
1598     unival *scratch,
1599     int current_cf,
1600     rrd_value_t pdp_temp_val,
1601     unsigned long elapsed_pdp_st,
1602     unsigned long start_pdp_offset,
1603     unsigned long pdp_cnt)
1605     rrd_value_t cum_val, cur_val;
1607     switch (current_cf) {
1608     case CF_AVERAGE:
1609         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1610         cur_val = IFDNAN(pdp_temp_val, 0.0);
1611         scratch[CDP_primary_val].u_val =
1612             (cum_val + cur_val * start_pdp_offset) /
1613             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1614         scratch[CDP_val].u_val =
1615             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1616                                           start_pdp_offset, pdp_cnt);
1617         break;
1618     case CF_MAXIMUM:
1619         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1620         cur_val = IFDNAN(pdp_temp_val, -DINF);
1621 #if 0
1622 #ifdef DEBUG
1623         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1624             fprintf(stderr,
1625                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1626                     i, ii);
1627             exit(-1);
1628         }
1629 #endif
1630 #endif
1631         if (cur_val > cum_val)
1632             scratch[CDP_primary_val].u_val = cur_val;
1633         else
1634             scratch[CDP_primary_val].u_val = cum_val;
1635         /* initialize carry over value */
1636         scratch[CDP_val].u_val = pdp_temp_val;
1637         break;
1638     case CF_MINIMUM:
1639         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1640         cur_val = IFDNAN(pdp_temp_val, DINF);
1641 #if 0
1642 #ifdef DEBUG
1643         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1644             fprintf(stderr,
1645                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1646                     ii);
1647             exit(-1);
1648         }
1649 #endif
1650 #endif
1651         if (cur_val < cum_val)
1652             scratch[CDP_primary_val].u_val = cur_val;
1653         else
1654             scratch[CDP_primary_val].u_val = cum_val;
1655         /* initialize carry over value */
1656         scratch[CDP_val].u_val = pdp_temp_val;
1657         break;
1658     case CF_LAST:
1659     default:
1660         scratch[CDP_primary_val].u_val = pdp_temp_val;
1661         /* initialize carry over value */
1662         scratch[CDP_val].u_val = pdp_temp_val;
1663         break;
1664     }
1667 /*
1668  * Update the consolidation function for Holt-Winters functions as
1669  * well as other functions that don't actually consolidate multiple
1670  * PDPs.
1671  */
1672 static void reset_cdp(
1673     rrd_t *rrd,
1674     unsigned long elapsed_pdp_st,
1675     rrd_value_t *pdp_temp,
1676     rrd_value_t *last_seasonal_coef,
1677     rrd_value_t *seasonal_coef,
1678     int rra_idx,
1679     int ds_idx,
1680     int cdp_idx,
1681     enum cf_en current_cf)
1683     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1685     switch (current_cf) {
1686     case CF_AVERAGE:
1687     default:
1688         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1689         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1690         break;
1691     case CF_SEASONAL:
1692     case CF_DEVSEASONAL:
1693         /* need to update cached seasonal values, so they are consistent
1694          * with the bulk update */
1695         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1696          * CDP_last_deviation are the same. */
1697         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1698         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1699         break;
1700     case CF_HWPREDICT:
1701     case CF_MHWPREDICT:
1702         /* need to update the null_count and last_null_count.
1703          * even do this for non-DNAN pdp_temp because the
1704          * algorithm is not learning from batch updates. */
1705         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1706         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1707         /* fall through */
1708     case CF_DEVPREDICT:
1709         scratch[CDP_primary_val].u_val = DNAN;
1710         scratch[CDP_secondary_val].u_val = DNAN;
1711         break;
1712     case CF_FAILURES:
1713         /* do not count missed bulk values as failures */
1714         scratch[CDP_primary_val].u_val = 0;
1715         scratch[CDP_secondary_val].u_val = 0;
1716         /* need to reset violations buffer.
1717          * could do this more carefully, but for now, just
1718          * assume a bulk update wipes away all violations. */
1719         erase_violations(rrd, cdp_idx, rra_idx);
1720         break;
1721     }
1724 static rrd_value_t initialize_average_carry_over(
1725     rrd_value_t pdp_temp_val,
1726     unsigned long elapsed_pdp_st,
1727     unsigned long start_pdp_offset,
1728     unsigned long pdp_cnt)
1730     /* initialize carry over value */
1731     if (isnan(pdp_temp_val)) {
1732         return DNAN;
1733     }
1734     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1737 /*
1738  * Update or initialize a CDP value based on the consolidation
1739  * function.
1740  *
1741  * Returns the new value.
1742  */
1743 static rrd_value_t calculate_cdp_val(
1744     rrd_value_t cdp_val,
1745     rrd_value_t pdp_temp_val,
1746     unsigned long elapsed_pdp_st,
1747     int current_cf,
1748     int i,
1749     int ii)
1751     if (isnan(cdp_val)) {
1752         if (current_cf == CF_AVERAGE) {
1753             pdp_temp_val *= elapsed_pdp_st;
1754         }
1755 #ifdef DEBUG
1756         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1757                 i, ii, pdp_temp_val);
1758 #endif
1759         return pdp_temp_val;
1760     }
1761     if (current_cf == CF_AVERAGE)
1762         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1763     if (current_cf == CF_MINIMUM)
1764         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1765     if (current_cf == CF_MAXIMUM)
1766         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1768     return pdp_temp_val;
1771 /*
1772  * For each RRA, update the seasonal values and then call update_aberrant_CF
1773  * for each data source.
1774  *
1775  * Return 0 on success, -1 on error.
1776  */
1777 static int update_aberrant_cdps(
1778     rrd_t *rrd,
1779     rrd_file_t *rrd_file,
1780     unsigned long rra_begin,
1781     unsigned long *rra_current,
1782     unsigned long elapsed_pdp_st,
1783     rrd_value_t *pdp_temp,
1784     rrd_value_t **seasonal_coef)
1786     unsigned long rra_idx, ds_idx, j;
1788     /* number of PDP steps since the last update that
1789      * are assigned to the first CDP to be generated
1790      * since the last update. */
1791     unsigned short scratch_idx;
1792     unsigned long rra_start;
1793     enum cf_en current_cf;
1795     /* this loop is only entered if elapsed_pdp_st < 3 */
1796     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1797          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1798         rra_start = rra_begin;
1799         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1800             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1801                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1802                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1803                     if (scratch_idx == CDP_primary_val) {
1804                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1805                                         elapsed_pdp_st + 1, seasonal_coef);
1806                     } else {
1807                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1808                                         elapsed_pdp_st + 2, seasonal_coef);
1809                     }
1810                     *rra_current = rrd_tell(rrd_file);
1811                 }
1812                 if (rrd_test_error())
1813                     return -1;
1814                 /* loop over data soures within each RRA */
1815                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1816                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1817                                        rra_idx * (rrd->stat_head->ds_cnt) +
1818                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1819                                        *seasonal_coef);
1820                 }
1821             }
1822             rra_start += rrd->rra_def[rra_idx].row_cnt
1823                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1824         }
1825     }
1826     return 0;
1829 /* 
1830  * Move sequentially through the file, writing one RRA at a time.  Note this
1831  * architecture divorces the computation of CDP with flushing updated RRA
1832  * entries to disk.
1833  *
1834  * Return 0 on success, -1 on error.
1835  */
1836 static int write_to_rras(
1837     rrd_t *rrd,
1838     rrd_file_t *rrd_file,
1839     unsigned long *rra_step_cnt,
1840     unsigned long rra_begin,
1841     unsigned long *rra_current,
1842     time_t current_time,
1843     unsigned long *skip_update,
1844     info_t **pcdp_summary)
1846     unsigned long rra_idx;
1847     unsigned long rra_start;
1848     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
1849     time_t    rra_time = 0; /* time of update for a RRA */
1851     /* Ready to write to disk */
1852     rra_start = rra_begin;
1853     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1854         /* skip unless there's something to write */
1855         if (rra_step_cnt[rra_idx]) {
1856             /* write the first row */
1857 #ifdef DEBUG
1858             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1859 #endif
1860             rrd->rra_ptr[rra_idx].cur_row++;
1861             if (rrd->rra_ptr[rra_idx].cur_row >=
1862                 rrd->rra_def[rra_idx].row_cnt)
1863                 rrd->rra_ptr[rra_idx].cur_row = 0;  /* wrap around */
1864             /* position on the first row */
1865             rra_pos_tmp = rra_start +
1866                 (rrd->stat_head->ds_cnt) * (rrd->rra_ptr[rra_idx].cur_row) *
1867                 sizeof(rrd_value_t);
1868             if (rra_pos_tmp != *rra_current) {
1869                 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1870                     rrd_set_error("seek error in rrd");
1871                     return -1;
1872                 }
1873                 *rra_current = rra_pos_tmp;
1874             }
1875 #ifdef DEBUG
1876             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1877 #endif
1878             if (!skip_update[rra_idx]) {
1879                 if (*pcdp_summary != NULL) {
1880                     rra_time = (current_time - current_time
1881                                 % (rrd->rra_def[rra_idx].pdp_cnt *
1882                                    rrd->stat_head->pdp_step))
1883                         -
1884                         ((rra_step_cnt[rra_idx] -
1885                           1) * rrd->rra_def[rra_idx].pdp_cnt *
1886                          rrd->stat_head->pdp_step);
1887                 }
1888                 if (write_RRA_row
1889                     (rrd_file, rrd, rra_idx, rra_current, CDP_primary_val,
1890                      pcdp_summary, rra_time) == -1)
1891                     return -1;
1892             }
1894             /* write other rows of the bulk update, if any */
1895             for (; rra_step_cnt[rra_idx] > 1; rra_step_cnt[rra_idx]--) {
1896                 if (++rrd->rra_ptr[rra_idx].cur_row ==
1897                     rrd->rra_def[rra_idx].row_cnt) {
1898 #ifdef DEBUG
1899                     fprintf(stderr,
1900                             "Wraparound for RRA %s, %lu updates left\n",
1901                             rrd->rra_def[rra_idx].cf_nam,
1902                             rra_step_cnt[rra_idx] - 1);
1903 #endif
1904                     /* wrap */
1905                     rrd->rra_ptr[rra_idx].cur_row = 0;
1906                     /* seek back to beginning of current rra */
1907                     if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1908                         rrd_set_error("seek error in rrd");
1909                         return -1;
1910                     }
1911 #ifdef DEBUG
1912                     fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1913                             rrd_file->pos);
1914 #endif
1915                     *rra_current = rra_start;
1916                 }
1917                 if (!skip_update[rra_idx]) {
1918                     if (*pcdp_summary != NULL) {
1919                         rra_time = (current_time - current_time
1920                                     % (rrd->rra_def[rra_idx].pdp_cnt *
1921                                        rrd->stat_head->pdp_step))
1922                             -
1923                             ((rra_step_cnt[rra_idx] -
1924                               2) * rrd->rra_def[rra_idx].pdp_cnt *
1925                              rrd->stat_head->pdp_step);
1926                     }
1927                     if (write_RRA_row(rrd_file, rrd, rra_idx, rra_current,
1928                                       CDP_secondary_val, pcdp_summary,
1929                                       rra_time) == -1)
1930                         return -1;
1931                 }
1932             }
1933         }
1934         rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1935             sizeof(rrd_value_t);
1936     }                   /* RRA LOOP */
1938     return 0;
1941 /*
1942  * Write out one row of values (one value per DS) to the archive.
1943  *
1944  * Returns 0 on success, -1 on error.
1945  */
1946 static int write_RRA_row(
1947     rrd_file_t *rrd_file,
1948     rrd_t *rrd,
1949     unsigned long rra_idx,
1950     unsigned long *rra_current,
1951     unsigned short CDP_scratch_idx,
1952     info_t **pcdp_summary,
1953     time_t rra_time)
1955     unsigned long ds_idx, cdp_idx;
1956     infoval   iv;
1958     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1959         /* compute the cdp index */
1960         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1961 #ifdef DEBUG
1962         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1963                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1964                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1965 #endif
1966         if (*pcdp_summary != NULL) {
1967             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1968             /* append info to the return hash */
1969             *pcdp_summary = info_push(*pcdp_summary,
1970                                       sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1971                                                     rra_time,
1972                                                     rrd->rra_def[rra_idx].
1973                                                     cf_nam,
1974                                                     rrd->rra_def[rra_idx].
1975                                                     pdp_cnt,
1976                                                     rrd->ds_def[ds_idx].
1977                                                     ds_nam), RD_I_VAL, iv);
1978         }
1979         if (rrd_write(rrd_file,
1980                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1981                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1982             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1983             return -1;
1984         }
1985         *rra_current += sizeof(rrd_value_t);
1986     }
1987     return 0;
1990 /*
1991  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1992  *
1993  * Returns 0 on success, -1 otherwise
1994  */
1995 static int smooth_all_rras(
1996     rrd_t *rrd,
1997     rrd_file_t *rrd_file,
1998     unsigned long rra_begin)
2000     unsigned long rra_start = rra_begin;
2001     unsigned long rra_idx;
2003     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2004         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2005             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2006 #ifdef DEBUG
2007             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2008 #endif
2009             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2010             if (rrd_test_error())
2011                 return -1;
2012         }
2013         rra_start += rrd->rra_def[rra_idx].row_cnt
2014             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2015     }
2016     return 0;
2019 #ifndef HAVE_MMAP
2020 /*
2021  * Flush changes to disk (unless we're using mmap)
2022  *
2023  * Returns 0 on success, -1 otherwise
2024  */
2025 static int write_changes_to_disk(
2026     rrd_t *rrd,
2027     rrd_file_t *rrd_file,
2028     int version)
2030     /* we just need to write back the live header portion now */
2031     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2032                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2033                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2034                  SEEK_SET) != 0) {
2035         rrd_set_error("seek rrd for live header writeback");
2036         return -1;
2037     }
2038     if (version >= 3) {
2039         if (rrd_write(rrd_file, rrd->live_head,
2040                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2041             rrd_set_error("rrd_write live_head to rrd");
2042             return -1;
2043         }
2044     } else {
2045         if (rrd_write(rrd_file, &rrd->live_head->last_up,
2046                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2047             rrd_set_error("rrd_write live_head to rrd");
2048             return -1;
2049         }
2050     }
2053     if (rrd_write(rrd_file, rrd->pdp_prep,
2054                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2055         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2056         rrd_set_error("rrd_write pdp_prep to rrd");
2057         return -1;
2058     }
2060     if (rrd_write(rrd_file, rrd->cdp_prep,
2061                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2062                   rrd->stat_head->ds_cnt)
2063         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2064                       rrd->stat_head->ds_cnt)) {
2066         rrd_set_error("rrd_write cdp_prep to rrd");
2067         return -1;
2068     }
2070     if (rrd_write(rrd_file, rrd->rra_ptr,
2071                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2072         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2073         rrd_set_error("rrd_write rra_ptr to rrd");
2074         return -1;
2075     }
2076     return 0;
2078 #endif