Code

complete segfault fix for second axis %s format begun in r2123 (thanks Martin Pelikan...
[rrdtool.git] / src / rrd_update.c
2 /*****************************************************************************
3  * RRDtool 1.3.9  Copyright by Tobi Oetiker, 1997-2009
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #ifdef WIN32
21 #include <stdlib.h>
22 #endif
24 #include "rrd_hw.h"
25 #include "rrd_rpncalc.h"
27 #include "rrd_is_thread_safe.h"
28 #include "unused.h"
30 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
31 /*
32  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
33  * replacement.
34  */
35 #include <sys/timeb.h>
37 #ifndef __MINGW32__
38 struct timeval {
39     time_t    tv_sec;   /* seconds */
40     long      tv_usec;  /* microseconds */
41 };
42 #endif
44 struct __timezone {
45     int       tz_minuteswest;   /* minutes W of Greenwich */
46     int       tz_dsttime;   /* type of dst correction */
47 };
49 static int gettimeofday(
50     struct timeval *t,
51     struct __timezone *tz)
52 {
54     struct _timeb current_time;
56     _ftime(&current_time);
58     t->tv_sec = current_time.time;
59     t->tv_usec = current_time.millitm * 1000;
61     return 0;
62 }
64 #endif
66 /* FUNCTION PROTOTYPES */
68 int       rrd_update_r(
69     const char *filename,
70     const char *tmplt,
71     int argc,
72     const char **argv);
73 int       _rrd_update(
74     const char *filename,
75     const char *tmplt,
76     int argc,
77     const char **argv,
78     rrd_info_t *);
80 static int allocate_data_structures(
81     rrd_t *rrd,
82     char ***updvals,
83     rrd_value_t **pdp_temp,
84     const char *tmplt,
85     long **tmpl_idx,
86     unsigned long *tmpl_cnt,
87     unsigned long **rra_step_cnt,
88     unsigned long **skip_update,
89     rrd_value_t **pdp_new);
91 static int parse_template(
92     rrd_t *rrd,
93     const char *tmplt,
94     unsigned long *tmpl_cnt,
95     long *tmpl_idx);
97 static int process_arg(
98     char *step_start,
99     rrd_t *rrd,
100     rrd_file_t *rrd_file,
101     unsigned long rra_begin,
102     time_t *current_time,
103     unsigned long *current_time_usec,
104     rrd_value_t *pdp_temp,
105     rrd_value_t *pdp_new,
106     unsigned long *rra_step_cnt,
107     char **updvals,
108     long *tmpl_idx,
109     unsigned long tmpl_cnt,
110     rrd_info_t ** pcdp_summary,
111     int version,
112     unsigned long *skip_update,
113     int *schedule_smooth);
115 static int parse_ds(
116     rrd_t *rrd,
117     char **updvals,
118     long *tmpl_idx,
119     char *input,
120     unsigned long tmpl_cnt,
121     time_t *current_time,
122     unsigned long *current_time_usec,
123     int version);
125 static int get_time_from_reading(
126     rrd_t *rrd,
127     char timesyntax,
128     char **updvals,
129     time_t *current_time,
130     unsigned long *current_time_usec,
131     int version);
133 static int update_pdp_prep(
134     rrd_t *rrd,
135     char **updvals,
136     rrd_value_t *pdp_new,
137     double interval);
139 static int calculate_elapsed_steps(
140     rrd_t *rrd,
141     unsigned long current_time,
142     unsigned long current_time_usec,
143     double interval,
144     double *pre_int,
145     double *post_int,
146     unsigned long *proc_pdp_cnt);
148 static void simple_update(
149     rrd_t *rrd,
150     double interval,
151     rrd_value_t *pdp_new);
153 static int process_all_pdp_st(
154     rrd_t *rrd,
155     double interval,
156     double pre_int,
157     double post_int,
158     unsigned long elapsed_pdp_st,
159     rrd_value_t *pdp_new,
160     rrd_value_t *pdp_temp);
162 static int process_pdp_st(
163     rrd_t *rrd,
164     unsigned long ds_idx,
165     double interval,
166     double pre_int,
167     double post_int,
168     long diff_pdp_st,
169     rrd_value_t *pdp_new,
170     rrd_value_t *pdp_temp);
172 static int update_all_cdp_prep(
173     rrd_t *rrd,
174     unsigned long *rra_step_cnt,
175     unsigned long rra_begin,
176     rrd_file_t *rrd_file,
177     unsigned long elapsed_pdp_st,
178     unsigned long proc_pdp_cnt,
179     rrd_value_t **last_seasonal_coef,
180     rrd_value_t **seasonal_coef,
181     rrd_value_t *pdp_temp,
182     unsigned long *skip_update,
183     int *schedule_smooth);
185 static int do_schedule_smooth(
186     rrd_t *rrd,
187     unsigned long rra_idx,
188     unsigned long elapsed_pdp_st);
190 static int update_cdp_prep(
191     rrd_t *rrd,
192     unsigned long elapsed_pdp_st,
193     unsigned long start_pdp_offset,
194     unsigned long *rra_step_cnt,
195     int rra_idx,
196     rrd_value_t *pdp_temp,
197     rrd_value_t *last_seasonal_coef,
198     rrd_value_t *seasonal_coef,
199     int current_cf);
201 static void update_cdp(
202     unival *scratch,
203     int current_cf,
204     rrd_value_t pdp_temp_val,
205     unsigned long rra_step_cnt,
206     unsigned long elapsed_pdp_st,
207     unsigned long start_pdp_offset,
208     unsigned long pdp_cnt,
209     rrd_value_t xff,
210     int i,
211     int ii);
213 static void initialize_cdp_val(
214     unival *scratch,
215     int current_cf,
216     rrd_value_t pdp_temp_val,
217     unsigned long elapsed_pdp_st,
218     unsigned long start_pdp_offset,
219     unsigned long pdp_cnt);
221 static void reset_cdp(
222     rrd_t *rrd,
223     unsigned long elapsed_pdp_st,
224     rrd_value_t *pdp_temp,
225     rrd_value_t *last_seasonal_coef,
226     rrd_value_t *seasonal_coef,
227     int rra_idx,
228     int ds_idx,
229     int cdp_idx,
230     enum cf_en current_cf);
232 static rrd_value_t initialize_average_carry_over(
233     rrd_value_t pdp_temp_val,
234     unsigned long elapsed_pdp_st,
235     unsigned long start_pdp_offset,
236     unsigned long pdp_cnt);
238 static rrd_value_t calculate_cdp_val(
239     rrd_value_t cdp_val,
240     rrd_value_t pdp_temp_val,
241     unsigned long elapsed_pdp_st,
242     int current_cf,
243     int i,
244     int ii);
246 static int update_aberrant_cdps(
247     rrd_t *rrd,
248     rrd_file_t *rrd_file,
249     unsigned long rra_begin,
250     unsigned long elapsed_pdp_st,
251     rrd_value_t *pdp_temp,
252     rrd_value_t **seasonal_coef);
254 static int write_to_rras(
255     rrd_t *rrd,
256     rrd_file_t *rrd_file,
257     unsigned long *rra_step_cnt,
258     unsigned long rra_begin,
259     time_t current_time,
260     unsigned long *skip_update,
261     rrd_info_t ** pcdp_summary);
263 static int write_RRA_row(
264     rrd_file_t *rrd_file,
265     rrd_t *rrd,
266     unsigned long rra_idx,
267     unsigned short CDP_scratch_idx,
268     rrd_info_t ** pcdp_summary,
269     time_t rra_time);
271 static int smooth_all_rras(
272     rrd_t *rrd,
273     rrd_file_t *rrd_file,
274     unsigned long rra_begin);
276 #ifndef HAVE_MMAP
277 static int write_changes_to_disk(
278     rrd_t *rrd,
279     rrd_file_t *rrd_file,
280     int version);
281 #endif
283 /*
284  * normalize time as returned by gettimeofday. usec part must
285  * be always >= 0
286  */
287 static inline void normalize_time(
288     struct timeval *t)
290     if (t->tv_usec < 0) {
291         t->tv_sec--;
292         t->tv_usec += 1e6L;
293     }
296 /*
297  * Sets current_time and current_time_usec based on the current time.
298  * current_time_usec is set to 0 if the version number is 1 or 2.
299  */
300 static inline void initialize_time(
301     time_t *current_time,
302     unsigned long *current_time_usec,
303     int version)
305     struct timeval tmp_time;    /* used for time conversion */
307     gettimeofday(&tmp_time, 0);
308     normalize_time(&tmp_time);
309     *current_time = tmp_time.tv_sec;
310     if (version >= 3) {
311         *current_time_usec = tmp_time.tv_usec;
312     } else {
313         *current_time_usec = 0;
314     }
317 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
319 rrd_info_t *rrd_update_v(
320     int argc,
321     char **argv)
323     char     *tmplt = NULL;
324     rrd_info_t *result = NULL;
325     rrd_infoval_t rc;
326     struct option long_options[] = {
327         {"template", required_argument, 0, 't'},
328         {0, 0, 0, 0}
329     };
331     rc.u_int = -1;
332     optind = 0;
333     opterr = 0;         /* initialize getopt */
335     while (1) {
336         int       option_index = 0;
337         int       opt;
339         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
341         if (opt == EOF)
342             break;
344         switch (opt) {
345         case 't':
346             tmplt = optarg;
347             break;
349         case '?':
350             rrd_set_error("unknown option '%s'", argv[optind - 1]);
351             goto end_tag;
352         }
353     }
355     /* need at least 2 arguments: filename, data. */
356     if (argc - optind < 2) {
357         rrd_set_error("Not enough arguments");
358         goto end_tag;
359     }
360     rc.u_int = 0;
361     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
362     rc.u_int = _rrd_update(argv[optind], tmplt,
363                            argc - optind - 1,
364                            (const char **) (argv + optind + 1), result);
365     result->value.u_int = rc.u_int;
366   end_tag:
367     return result;
370 int rrd_update(
371     int argc,
372     char **argv)
374     struct option long_options[] = {
375         {"template", required_argument, 0, 't'},
376         {0, 0, 0, 0}
377     };
378     int       option_index = 0;
379     int       opt;
380     char     *tmplt = NULL;
381     int       rc = -1;
383     optind = 0;
384     opterr = 0;         /* initialize getopt */
386     while (1) {
387         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
389         if (opt == EOF)
390             break;
392         switch (opt) {
393         case 't':
394             tmplt = strdup(optarg);
395             break;
397         case '?':
398             rrd_set_error("unknown option '%s'", argv[optind - 1]);
399             goto out;
400         }
401     }
403     /* need at least 2 arguments: filename, data. */
404     if (argc - optind < 2) {
405         rrd_set_error("Not enough arguments");
406         goto out;
407     }
409     rc = rrd_update_r(argv[optind], tmplt,
410                       argc - optind - 1, (const char **) (argv + optind + 1));
411   out:
412     free(tmplt);
413     return rc;
416 int rrd_update_r(
417     const char *filename,
418     const char *tmplt,
419     int argc,
420     const char **argv)
422     return _rrd_update(filename, tmplt, argc, argv, NULL);
425 int _rrd_update(
426     const char *filename,
427     const char *tmplt,
428     int argc,
429     const char **argv,
430     rrd_info_t * pcdp_summary)
433     int       arg_i = 2;
435     unsigned long rra_begin;    /* byte pointer to the rra
436                                  * area in the rrd file.  this
437                                  * pointer never changes value */
438     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
439                              * to the existing entry */
440     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
441                              * to the cdp values */
443     long     *tmpl_idx; /* index representing the settings
444                          * transported by the tmplt index */
445     unsigned long tmpl_cnt = 2; /* time and data */
446     rrd_t     rrd;
447     time_t    current_time = 0;
448     unsigned long current_time_usec = 0;    /* microseconds part of current time */
449     char    **updvals;
450     int       schedule_smooth = 0;
452     /* number of elapsed PDP steps since last update */
453     unsigned long *rra_step_cnt = NULL;
455     int       version;  /* rrd version */
456     rrd_file_t *rrd_file;
457     char     *arg_copy; /* for processing the argv */
458     unsigned long *skip_update; /* RRAs to advance but not write */
460     /* need at least 1 arguments: data. */
461     if (argc < 1) {
462         rrd_set_error("Not enough arguments");
463         goto err_out;
464     }
466     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
467         goto err_free;
468     }
469     /* We are now at the beginning of the rra's */
470     rra_begin = rrd_file->header_len;
472     version = atoi(rrd.stat_head->version);
474     initialize_time(&current_time, &current_time_usec, version);
476     /* get exclusive lock to whole file.
477      * lock gets removed when we close the file.
478      */
479     if (rrd_lock(rrd_file) != 0) {
480         rrd_set_error("could not lock RRD");
481         goto err_close;
482     }
484     if (allocate_data_structures(&rrd, &updvals,
485                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
486                                  &rra_step_cnt, &skip_update,
487                                  &pdp_new) == -1) {
488         goto err_close;
489     }
491     /* loop through the arguments. */
492     for (arg_i = 0; arg_i < argc; arg_i++) {
493         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
494             rrd_set_error("failed duplication argv entry");
495             break;
496         }
497         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
498                         &current_time, &current_time_usec, pdp_temp, pdp_new,
499                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
500                         &pcdp_summary, version, skip_update,
501                         &schedule_smooth) == -1) {
502             if (rrd_test_error()) { /* Should have error string always here */
503                 char     *save_error;
505                 /* Prepend file name to error message */
506                 if ((save_error = strdup(rrd_get_error())) != NULL) {
507                     rrd_set_error("%s: %s", filename, save_error);
508                     free(save_error);
509                 }
510             }
511             free(arg_copy);
512             break;
513         }
514         free(arg_copy);
515     }
517     free(rra_step_cnt);
519     /* if we got here and if there is an error and if the file has not been
520      * written to, then close things up and return. */
521     if (rrd_test_error()) {
522         goto err_free_structures;
523     }
524 #ifndef HAVE_MMAP
525     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
526         goto err_free_structures;
527     }
528 #endif
530     /* calling the smoothing code here guarantees at most one smoothing
531      * operation per rrd_update call. Unfortunately, it is possible with bulk
532      * updates, or a long-delayed update for smoothing to occur off-schedule.
533      * This really isn't critical except during the burn-in cycles. */
534     if (schedule_smooth) {
535         smooth_all_rras(&rrd, rrd_file, rra_begin);
536     }
538 /*    rrd_dontneed(rrd_file,&rrd); */
539     rrd_free(&rrd);
540     rrd_close(rrd_file);
542     free(pdp_new);
543     free(tmpl_idx);
544     free(pdp_temp);
545     free(skip_update);
546     free(updvals);
547     return 0;
549   err_free_structures:
550     free(pdp_new);
551     free(tmpl_idx);
552     free(pdp_temp);
553     free(skip_update);
554     free(updvals);
555   err_close:
556     rrd_close(rrd_file);
557   err_free:
558     rrd_free(&rrd);
559   err_out:
560     return -1;
563 /*
564  * get exclusive lock to whole file.
565  * lock gets removed when we close the file
566  *
567  * returns 0 on success
568  */
569 int rrd_lock(
570     rrd_file_t *file)
572     int       rcstat;
574     {
575 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
576         struct _stat st;
578         if (_fstat(file->fd, &st) == 0) {
579             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
580         } else {
581             rcstat = -1;
582         }
583 #else
584         struct flock lock;
586         lock.l_type = F_WRLCK;  /* exclusive write lock */
587         lock.l_len = 0; /* whole file */
588         lock.l_start = 0;   /* start of file */
589         lock.l_whence = SEEK_SET;   /* end of file */
591         rcstat = fcntl(file->fd, F_SETLK, &lock);
592 #endif
593     }
595     return (rcstat);
598 /*
599  * Allocate some important arrays used, and initialize the template.
600  *
601  * When it returns, either all of the structures are allocated
602  * or none of them are.
603  *
604  * Returns 0 on success, -1 on error.
605  */
606 static int allocate_data_structures(
607     rrd_t *rrd,
608     char ***updvals,
609     rrd_value_t **pdp_temp,
610     const char *tmplt,
611     long **tmpl_idx,
612     unsigned long *tmpl_cnt,
613     unsigned long **rra_step_cnt,
614     unsigned long **skip_update,
615     rrd_value_t **pdp_new)
617     unsigned  i, ii;
618     if ((*updvals = (char **) malloc(sizeof(char *)
619                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
620         rrd_set_error("allocating updvals pointer array.");
621         return -1;
622     }
623     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
624                                             * rrd->stat_head->ds_cnt)) ==
625         NULL) {
626         rrd_set_error("allocating pdp_temp.");
627         goto err_free_updvals;
628     }
629     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
630                                                  *
631                                                  rrd->stat_head->rra_cnt)) ==
632         NULL) {
633         rrd_set_error("allocating skip_update.");
634         goto err_free_pdp_temp;
635     }
636     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
637                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
638         rrd_set_error("allocating tmpl_idx.");
639         goto err_free_skip_update;
640     }
641     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
642                                                   *
643                                                   (rrd->stat_head->
644                                                    rra_cnt))) == NULL) {
645         rrd_set_error("allocating rra_step_cnt.");
646         goto err_free_tmpl_idx;
647     }
649     /* initialize tmplt redirector */
650     /* default config example (assume DS 1 is a CDEF DS)
651        tmpl_idx[0] -> 0; (time)
652        tmpl_idx[1] -> 1; (DS 0)
653        tmpl_idx[2] -> 3; (DS 2)
654        tmpl_idx[3] -> 4; (DS 3) */
655     (*tmpl_idx)[0] = 0; /* time */
656     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
657         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
658             (*tmpl_idx)[ii++] = i;
659     }
660     *tmpl_cnt = ii;
662     if (tmplt != NULL) {
663         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
664             goto err_free_rra_step_cnt;
665         }
666     }
668     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
669                                            * rrd->stat_head->ds_cnt)) == NULL) {
670         rrd_set_error("allocating pdp_new.");
671         goto err_free_rra_step_cnt;
672     }
674     return 0;
676   err_free_rra_step_cnt:
677     free(*rra_step_cnt);
678   err_free_tmpl_idx:
679     free(*tmpl_idx);
680   err_free_skip_update:
681     free(*skip_update);
682   err_free_pdp_temp:
683     free(*pdp_temp);
684   err_free_updvals:
685     free(*updvals);
686     return -1;
689 /*
690  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
691  *
692  * Returns 0 on success.
693  */
694 static int parse_template(
695     rrd_t *rrd,
696     const char *tmplt,
697     unsigned long *tmpl_cnt,
698     long *tmpl_idx)
700     char     *dsname, *tmplt_copy;
701     unsigned int tmpl_len, i;
702     int       ret = 0;
704     *tmpl_cnt = 1;      /* the first entry is the time */
706     /* we should work on a writeable copy here */
707     if ((tmplt_copy = strdup(tmplt)) == NULL) {
708         rrd_set_error("error copying tmplt '%s'", tmplt);
709         ret = -1;
710         goto out;
711     }
713     dsname = tmplt_copy;
714     tmpl_len = strlen(tmplt_copy);
715     for (i = 0; i <= tmpl_len; i++) {
716         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
717             tmplt_copy[i] = '\0';
718             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
719                 rrd_set_error("tmplt contains more DS definitions than RRD");
720                 ret = -1;
721                 goto out_free_tmpl_copy;
722             }
723             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
724                 rrd_set_error("unknown DS name '%s'", dsname);
725                 ret = -1;
726                 goto out_free_tmpl_copy;
727             }
728             /* go to the next entry on the tmplt_copy */
729             if (i < tmpl_len)
730                 dsname = &tmplt_copy[i + 1];
731         }
732     }
733   out_free_tmpl_copy:
734     free(tmplt_copy);
735   out:
736     return ret;
739 /*
740  * Parse an update string, updates the primary data points (PDPs)
741  * and consolidated data points (CDPs), and writes changes to the RRAs.
742  *
743  * Returns 0 on success, -1 on error.
744  */
745 static int process_arg(
746     char *step_start,
747     rrd_t *rrd,
748     rrd_file_t *rrd_file,
749     unsigned long rra_begin,
750     time_t *current_time,
751     unsigned long *current_time_usec,
752     rrd_value_t *pdp_temp,
753     rrd_value_t *pdp_new,
754     unsigned long *rra_step_cnt,
755     char **updvals,
756     long *tmpl_idx,
757     unsigned long tmpl_cnt,
758     rrd_info_t ** pcdp_summary,
759     int version,
760     unsigned long *skip_update,
761     int *schedule_smooth)
763     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
765     /* a vector of future Holt-Winters seasonal coefs */
766     unsigned long elapsed_pdp_st;
768     double    interval, pre_int, post_int;  /* interval between this and
769                                              * the last run */
770     unsigned long proc_pdp_cnt;
772     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
773                  current_time, current_time_usec, version) == -1) {
774         return -1;
775     }
777     interval = (double) (*current_time - rrd->live_head->last_up)
778         + (double) ((long) *current_time_usec -
779                     (long) rrd->live_head->last_up_usec) / 1e6f;
781     /* process the data sources and update the pdp_prep 
782      * area accordingly */
783     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
784         return -1;
785     }
787     elapsed_pdp_st = calculate_elapsed_steps(rrd,
788                                              *current_time,
789                                              *current_time_usec, interval,
790                                              &pre_int, &post_int,
791                                              &proc_pdp_cnt);
793     /* has a pdp_st moment occurred since the last run ? */
794     if (elapsed_pdp_st == 0) {
795         /* no we have not passed a pdp_st moment. therefore update is simple */
796         simple_update(rrd, interval, pdp_new);
797     } else {
798         /* an pdp_st has occurred. */
799         if (process_all_pdp_st(rrd, interval,
800                                pre_int, post_int,
801                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
802             return -1;
803         }
804         if (update_all_cdp_prep(rrd, rra_step_cnt,
805                                 rra_begin, rrd_file,
806                                 elapsed_pdp_st,
807                                 proc_pdp_cnt,
808                                 &last_seasonal_coef,
809                                 &seasonal_coef,
810                                 pdp_temp,
811                                 skip_update, schedule_smooth) == -1) {
812             goto err_free_coefficients;
813         }
814         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
815                                  elapsed_pdp_st, pdp_temp,
816                                  &seasonal_coef) == -1) {
817             goto err_free_coefficients;
818         }
819         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
820                           *current_time, skip_update,
821                           pcdp_summary) == -1) {
822             goto err_free_coefficients;
823         }
824     }                   /* endif a pdp_st has occurred */
825     rrd->live_head->last_up = *current_time;
826     rrd->live_head->last_up_usec = *current_time_usec;
828     if (version < 3) {
829         *rrd->legacy_last_up = rrd->live_head->last_up;
830     }
831     free(seasonal_coef);
832     free(last_seasonal_coef);
833     return 0;
835   err_free_coefficients:
836     free(seasonal_coef);
837     free(last_seasonal_coef);
838     return -1;
841 /*
842  * Parse a DS string (time + colon-separated values), storing the
843  * results in current_time, current_time_usec, and updvals.
844  *
845  * Returns 0 on success, -1 on error.
846  */
847 static int parse_ds(
848     rrd_t *rrd,
849     char **updvals,
850     long *tmpl_idx,
851     char *input,
852     unsigned long tmpl_cnt,
853     time_t *current_time,
854     unsigned long *current_time_usec,
855     int version)
857     char     *p;
858     unsigned long i;
859     char      timesyntax;
861     updvals[0] = input;
862     /* initialize all ds input to unknown except the first one
863        which has always got to be set */
864     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
865         updvals[i] = "U";
867     /* separate all ds elements; first must be examined separately
868        due to alternate time syntax */
869     if ((p = strchr(input, '@')) != NULL) {
870         timesyntax = '@';
871     } else if ((p = strchr(input, ':')) != NULL) {
872         timesyntax = ':';
873     } else {
874         rrd_set_error("expected timestamp not found in data source from %s",
875                       input);
876         return -1;
877     }
878     *p = '\0';
879     i = 1;
880     updvals[tmpl_idx[i++]] = p + 1;
881     while (*(++p)) {
882         if (*p == ':') {
883             *p = '\0';
884             if (i < tmpl_cnt) {
885                 updvals[tmpl_idx[i++]] = p + 1;
886             }
887             else {
888                 rrd_set_error("found extra data on update argument: %s",p+1);
889                 return -1;
890             }                
891         }
892     }
894     if (i != tmpl_cnt) {
895         rrd_set_error("expected %lu data source readings (got %lu) from %s",
896                       tmpl_cnt - 1, i - 1, input);
897         return -1;
898     }
900     if (get_time_from_reading(rrd, timesyntax, updvals,
901                               current_time, current_time_usec,
902                               version) == -1) {
903         return -1;
904     }
905     return 0;
908 /*
909  * Parse the time in a DS string, store it in current_time and 
910  * current_time_usec and verify that it's later than the last
911  * update for this DS.
912  *
913  * Returns 0 on success, -1 on error.
914  */
915 static int get_time_from_reading(
916     rrd_t *rrd,
917     char timesyntax,
918     char **updvals,
919     time_t *current_time,
920     unsigned long *current_time_usec,
921     int version)
923     double    tmp;
924     char     *parsetime_error = NULL;
925     char     *old_locale;
926     rrd_time_value_t ds_tv;
927     struct timeval tmp_time;    /* used for time conversion */
929     /* get the time from the reading ... handle N */
930     if (timesyntax == '@') {    /* at-style */
931         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
932             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
933             return -1;
934         }
935         if (ds_tv.type == RELATIVE_TO_END_TIME ||
936             ds_tv.type == RELATIVE_TO_START_TIME) {
937             rrd_set_error("specifying time relative to the 'start' "
938                           "or 'end' makes no sense here: %s", updvals[0]);
939             return -1;
940         }
941         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
942         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
943     } else if (strcmp(updvals[0], "N") == 0) {
944         gettimeofday(&tmp_time, 0);
945         normalize_time(&tmp_time);
946         *current_time = tmp_time.tv_sec;
947         *current_time_usec = tmp_time.tv_usec;
948     } else {
949         old_locale = setlocale(LC_NUMERIC, NULL);
950         setlocale(LC_NUMERIC, "C");
951         errno = 0;
952         tmp = strtod(updvals[0], 0);
953         if (errno > 0) {
954             rrd_set_error("converting '%s' to float: %s",
955                 updvals[0], rrd_strerror(errno));
956             return -1;
957         };
958         setlocale(LC_NUMERIC, old_locale);
959         if (tmp < 0.0){
960             gettimeofday(&tmp_time, 0);
961             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
962         }
964         *current_time = floor(tmp);
965         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
966     }
967     /* dont do any correction for old version RRDs */
968     if (version < 3)
969         *current_time_usec = 0;
971     if (*current_time < rrd->live_head->last_up ||
972         (*current_time == rrd->live_head->last_up &&
973          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
974         rrd_set_error("illegal attempt to update using time %ld when "
975                       "last update time is %ld (minimum one second step)",
976                       *current_time, rrd->live_head->last_up);
977         return -1;
978     }
979     return 0;
982 /*
983  * Update pdp_new by interpreting the updvals according to the DS type
984  * (COUNTER, GAUGE, etc.).
985  *
986  * Returns 0 on success, -1 on error.
987  */
988 static int update_pdp_prep(
989     rrd_t *rrd,
990     char **updvals,
991     rrd_value_t *pdp_new,
992     double interval)
994     unsigned long ds_idx;
995     int       ii;
996     char     *endptr;   /* used in the conversion */
997     double    rate;
998     char     *old_locale;
999     enum dst_en dst_idx;
1001     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1002         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1004         /* make sure we do not build diffs with old last_ds values */
1005         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1006             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1007             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1008         }
1010         /* NOTE: DST_CDEF should never enter this if block, because
1011          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1012          * accidently specified a value for the DST_CDEF. To handle this case,
1013          * an extra check is required. */
1015         if ((updvals[ds_idx + 1][0] != 'U') &&
1016             (dst_idx != DST_CDEF) &&
1017             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1018             rate = DNAN;
1020             /* pdp_new contains rate * time ... eg the bytes transferred during
1021              * the interval. Doing it this way saves a lot of math operations
1022              */
1023             switch (dst_idx) {
1024             case DST_COUNTER:
1025             case DST_DERIVE:
1026                 /* Check if this is a valid integer. `U' is already handled in
1027                  * another branch. */
1028                 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1029                     if ((ii == 0) && (dst_idx == DST_DERIVE)
1030                             && (updvals[ds_idx + 1][ii] == '-'))
1031                         continue;
1033                     if ((updvals[ds_idx + 1][ii] < '0')
1034                             || (updvals[ds_idx + 1][ii] > '9')) {
1035                         rrd_set_error("not a simple %s integer: '%s'",
1036                                 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1037                                 updvals[ds_idx + 1]);
1038                         return -1;
1039                     }
1040                 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1042                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1043                     pdp_new[ds_idx] =
1044                         rrd_diff(updvals[ds_idx + 1],
1045                                  rrd->pdp_prep[ds_idx].last_ds);
1046                     if (dst_idx == DST_COUNTER) {
1047                         /* simple overflow catcher. This will fail
1048                          * terribly for non 32 or 64 bit counters
1049                          * ... are there any others in SNMP land?
1050                          */
1051                         if (pdp_new[ds_idx] < (double) 0.0)
1052                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1053                         if (pdp_new[ds_idx] < (double) 0.0)
1054                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1055                     }
1056                     rate = pdp_new[ds_idx] / interval;
1057                 } else {
1058                     pdp_new[ds_idx] = DNAN;
1059                 }
1060                 break;
1061             case DST_ABSOLUTE:
1062                 old_locale = setlocale(LC_NUMERIC, NULL);
1063                 setlocale(LC_NUMERIC, "C");
1064                 errno = 0;
1065                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1066                 if (errno > 0) {
1067                     rrd_set_error("converting '%s' to float: %s",
1068                                   updvals[ds_idx + 1], rrd_strerror(errno));
1069                     return -1;
1070                 };
1071                 setlocale(LC_NUMERIC, old_locale);
1072                 if (endptr[0] != '\0') {
1073                     rrd_set_error
1074                         ("conversion of '%s' to float not complete: tail '%s'",
1075                          updvals[ds_idx + 1], endptr);
1076                     return -1;
1077                 }
1078                 rate = pdp_new[ds_idx] / interval;
1079                 break;
1080             case DST_GAUGE:
1081                 old_locale = setlocale(LC_NUMERIC, NULL);
1082                 setlocale(LC_NUMERIC, "C");
1083                 errno = 0;
1084                 pdp_new[ds_idx] =
1085                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1086                 if (errno) {
1087                     rrd_set_error("converting '%s' to float: %s",
1088                                   updvals[ds_idx + 1], rrd_strerror(errno));
1089                     return -1;
1090                 };
1091                 setlocale(LC_NUMERIC, old_locale);
1092                 if (endptr[0] != '\0') {
1093                     rrd_set_error
1094                         ("conversion of '%s' to float not complete: tail '%s'",
1095                          updvals[ds_idx + 1], endptr);
1096                     return -1;
1097                 }
1098                 rate = pdp_new[ds_idx] / interval;
1099                 break;
1100             default:
1101                 rrd_set_error("rrd contains unknown DS type : '%s'",
1102                               rrd->ds_def[ds_idx].dst);
1103                 return -1;
1104             }
1105             /* break out of this for loop if the error string is set */
1106             if (rrd_test_error()) {
1107                 return -1;
1108             }
1109             /* make sure pdp_temp is neither too large or too small
1110              * if any of these occur it becomes unknown ...
1111              * sorry folks ... */
1112             if (!isnan(rate) &&
1113                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1114                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1115                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1116                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1117                 pdp_new[ds_idx] = DNAN;
1118             }
1119         } else {
1120             /* no news is news all the same */
1121             pdp_new[ds_idx] = DNAN;
1122         }
1125         /* make a copy of the command line argument for the next run */
1126 #ifdef DEBUG
1127         fprintf(stderr, "prep ds[%lu]\t"
1128                 "last_arg '%s'\t"
1129                 "this_arg '%s'\t"
1130                 "pdp_new %10.2f\n",
1131                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1132                 pdp_new[ds_idx]);
1133 #endif
1134         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1135                 LAST_DS_LEN - 1);
1136         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1137     }
1138     return 0;
1141 /*
1142  * How many PDP steps have elapsed since the last update? Returns the answer,
1143  * and stores the time between the last update and the last PDP in pre_time,
1144  * and the time between the last PDP and the current time in post_int.
1145  */
1146 static int calculate_elapsed_steps(
1147     rrd_t *rrd,
1148     unsigned long current_time,
1149     unsigned long current_time_usec,
1150     double interval,
1151     double *pre_int,
1152     double *post_int,
1153     unsigned long *proc_pdp_cnt)
1155     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1156     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1157                                  * time */
1158     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1159                                  * when it was last updated */
1160     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1162     /* when was the current pdp started */
1163     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1164     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1166     /* when did the last pdp_st occur */
1167     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1168     occu_pdp_st = current_time - occu_pdp_age;
1170     if (occu_pdp_st > proc_pdp_st) {
1171         /* OK we passed the pdp_st moment */
1172         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1173                                                                      * occurred before the latest
1174                                                                      * pdp_st moment*/
1175         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1176         *post_int = occu_pdp_age;   /* how much after it */
1177         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1178     } else {
1179         *pre_int = interval;
1180         *post_int = 0;
1181     }
1183     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1185 #ifdef DEBUG
1186     printf("proc_pdp_age %lu\t"
1187            "proc_pdp_st %lu\t"
1188            "occu_pfp_age %lu\t"
1189            "occu_pdp_st %lu\t"
1190            "int %lf\t"
1191            "pre_int %lf\t"
1192            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1193            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1194 #endif
1196     /* compute the number of elapsed pdp_st moments */
1197     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1200 /*
1201  * Increment the PDP values by the values in pdp_new, or else initialize them.
1202  */
1203 static void simple_update(
1204     rrd_t *rrd,
1205     double interval,
1206     rrd_value_t *pdp_new)
1208     int       i;
1210     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1211         if (isnan(pdp_new[i])) {
1212             /* this is not really accurate if we use subsecond data arrival time
1213                should have thought of it when going subsecond resolution ...
1214                sorry next format change we will have it! */
1215             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1216                 floor(interval);
1217         } else {
1218             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1219                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1220             } else {
1221                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1222             }
1223         }
1224 #ifdef DEBUG
1225         fprintf(stderr,
1226                 "NO PDP  ds[%i]\t"
1227                 "value %10.2f\t"
1228                 "unkn_sec %5lu\n",
1229                 i,
1230                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1231                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1232 #endif
1233     }
1236 /*
1237  * Call process_pdp_st for each DS.
1238  *
1239  * Returns 0 on success, -1 on error.
1240  */
1241 static int process_all_pdp_st(
1242     rrd_t *rrd,
1243     double interval,
1244     double pre_int,
1245     double post_int,
1246     unsigned long elapsed_pdp_st,
1247     rrd_value_t *pdp_new,
1248     rrd_value_t *pdp_temp)
1250     unsigned long ds_idx;
1252     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1253        rate*seconds which occurred up to the last run.
1254        pdp_new[] contains rate*seconds from the latest run.
1255        pdp_temp[] will contain the rate for cdp */
1257     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1258         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1259                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1260                            pdp_new, pdp_temp) == -1) {
1261             return -1;
1262         }
1263 #ifdef DEBUG
1264         fprintf(stderr, "PDP UPD ds[%lu]\t"
1265                 "elapsed_pdp_st %lu\t"
1266                 "pdp_temp %10.2f\t"
1267                 "new_prep %10.2f\t"
1268                 "new_unkn_sec %5lu\n",
1269                 ds_idx,
1270                 elapsed_pdp_st,
1271                 pdp_temp[ds_idx],
1272                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1273                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1274 #endif
1275     }
1276     return 0;
1279 /*
1280  * Process an update that occurs after one of the PDP moments.
1281  * Increments the PDP value, sets NAN if time greater than the
1282  * heartbeats have elapsed, processes CDEFs.
1283  *
1284  * Returns 0 on success, -1 on error.
1285  */
1286 static int process_pdp_st(
1287     rrd_t *rrd,
1288     unsigned long ds_idx,
1289     double interval,
1290     double pre_int,
1291     double post_int,
1292     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1293     rrd_value_t *pdp_new,
1294     rrd_value_t *pdp_temp)
1296     int       i;
1298     /* update pdp_prep to the current pdp_st. */
1299     double    pre_unknown = 0.0;
1300     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1301     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1303     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1305     rpnstack_init(&rpnstack);
1308     if (isnan(pdp_new[ds_idx])) {
1309         /* a final bit of unknown to be added before calculation
1310            we use a temporary variable for this so that we
1311            don't have to turn integer lines before using the value */
1312         pre_unknown = pre_int;
1313     } else {
1314         if (isnan(scratch[PDP_val].u_val)) {
1315             scratch[PDP_val].u_val = 0;
1316         }
1317         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1318     }
1320     /* if too much of the pdp_prep is unknown we dump it */
1321     /* if the interval is larger thatn mrhb we get NAN */
1322     if ((interval > mrhb) ||
1323         (rrd->stat_head->pdp_step / 2.0 <
1324          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1325         pdp_temp[ds_idx] = DNAN;
1326     } else {
1327         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1328             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1329              pre_unknown);
1330     }
1332     /* process CDEF data sources; remember each CDEF DS can
1333      * only reference other DS with a lower index number */
1334     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1335         rpnp_t   *rpnp;
1337         rpnp =
1338             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1339         if(rpnp == NULL) {
1340           rpnstack_free(&rpnstack);
1341           return -1;
1342         }
1343         /* substitute data values for OP_VARIABLE nodes */
1344         for (i = 0; rpnp[i].op != OP_END; i++) {
1345             if (rpnp[i].op == OP_VARIABLE) {
1346                 rpnp[i].op = OP_NUMBER;
1347                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1348             }
1349         }
1350         /* run the rpn calculator */
1351         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1352             free(rpnp);
1353             rpnstack_free(&rpnstack);
1354             return -1;
1355         }
1356         free(rpnp);
1357     }
1359     /* make pdp_prep ready for the next run */
1360     if (isnan(pdp_new[ds_idx])) {
1361         /* this is not realy accurate if we use subsecond data arival time
1362            should have thought of it when going subsecond resolution ...
1363            sorry next format change we will have it! */
1364         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1365         scratch[PDP_val].u_val = DNAN;
1366     } else {
1367         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1368         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1369     }
1370     rpnstack_free(&rpnstack);
1371     return 0;
1374 /*
1375  * Iterate over all the RRAs for a given DS and:
1376  * 1. Decide whether to schedule a smooth later
1377  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1378  * 3. Update the CDP
1379  *
1380  * Returns 0 on success, -1 on error
1381  */
1382 static int update_all_cdp_prep(
1383     rrd_t *rrd,
1384     unsigned long *rra_step_cnt,
1385     unsigned long rra_begin,
1386     rrd_file_t *rrd_file,
1387     unsigned long elapsed_pdp_st,
1388     unsigned long proc_pdp_cnt,
1389     rrd_value_t **last_seasonal_coef,
1390     rrd_value_t **seasonal_coef,
1391     rrd_value_t *pdp_temp,
1392     unsigned long *skip_update,
1393     int *schedule_smooth)
1395     unsigned long rra_idx;
1397     /* index into the CDP scratch array */
1398     enum cf_en current_cf;
1399     unsigned long rra_start;
1401     /* number of rows to be updated in an RRA for a data value. */
1402     unsigned long start_pdp_offset;
1404     rra_start = rra_begin;
1405     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1406         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1407         start_pdp_offset =
1408             rrd->rra_def[rra_idx].pdp_cnt -
1409             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1410         skip_update[rra_idx] = 0;
1411         if (start_pdp_offset <= elapsed_pdp_st) {
1412             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1413                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1414         } else {
1415             rra_step_cnt[rra_idx] = 0;
1416         }
1418         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1419             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1420              * so that they will be correct for the next observed value; note that for
1421              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1422              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1423             if (rra_step_cnt[rra_idx] > 1) {
1424                 skip_update[rra_idx] = 1;
1425                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1426                                 elapsed_pdp_st, last_seasonal_coef);
1427                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1428                                 elapsed_pdp_st + 1, seasonal_coef);
1429             }
1430             /* periodically run a smoother for seasonal effects */
1431             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1432 #ifdef DEBUG
1433                 fprintf(stderr,
1434                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1435                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1436                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1437                         u_cnt);
1438 #endif
1439                 *schedule_smooth = 1;
1440             }
1441         }
1442         if (rrd_test_error())
1443             return -1;
1445         if (update_cdp_prep
1446             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1447              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1448              current_cf) == -1) {
1449             return -1;
1450         }
1451         rra_start +=
1452             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1453             sizeof(rrd_value_t);
1454     }
1455     return 0;
1458 /* 
1459  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1460  */
1461 static int do_schedule_smooth(
1462     rrd_t *rrd,
1463     unsigned long rra_idx,
1464     unsigned long elapsed_pdp_st)
1466     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1467     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1468     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1469     unsigned long seasonal_smooth_idx =
1470         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1471     unsigned long *init_seasonal =
1472         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1474     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1475      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1476      * really an RRA level, not a data source within RRA level parameter, but
1477      * the rra_def is read only for rrd_update (not flushed to disk). */
1478     if (*init_seasonal > BURNIN_CYCLES) {
1479         /* someone has no doubt invented a trick to deal with this wrap around,
1480          * but at least this code is clear. */
1481         if (seasonal_smooth_idx > cur_row) {
1482             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1483              * between PDP and CDP */
1484             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1485         }
1486         /* can't rely on negative numbers because we are working with
1487          * unsigned values */
1488         return (cur_row + elapsed_pdp_st >= row_cnt
1489                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1490     }
1491     /* mark off one of the burn-in cycles */
1492     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1495 /*
1496  * For a given RRA, iterate over the data sources and call the appropriate
1497  * consolidation function.
1498  *
1499  * Returns 0 on success, -1 on error.
1500  */
1501 static int update_cdp_prep(
1502     rrd_t *rrd,
1503     unsigned long elapsed_pdp_st,
1504     unsigned long start_pdp_offset,
1505     unsigned long *rra_step_cnt,
1506     int rra_idx,
1507     rrd_value_t *pdp_temp,
1508     rrd_value_t *last_seasonal_coef,
1509     rrd_value_t *seasonal_coef,
1510     int current_cf)
1512     unsigned long ds_idx, cdp_idx;
1514     /* update CDP_PREP areas */
1515     /* loop over data soures within each RRA */
1516     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1518         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1520         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1521             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1522                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1523                        elapsed_pdp_st, start_pdp_offset,
1524                        rrd->rra_def[rra_idx].pdp_cnt,
1525                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1526                        rra_idx, ds_idx);
1527         } else {
1528             /* Nothing to consolidate if there's one PDP per CDP. However, if
1529              * we've missed some PDPs, let's update null counters etc. */
1530             if (elapsed_pdp_st > 2) {
1531                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1532                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1533                           (enum cf_en)current_cf);
1534             }
1535         }
1537         if (rrd_test_error())
1538             return -1;
1539     }                   /* endif data sources loop */
1540     return 0;
1543 /*
1544  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1545  * primary value, secondary value, and # of unknowns.
1546  */
1547 static void update_cdp(
1548     unival *scratch,
1549     int current_cf,
1550     rrd_value_t pdp_temp_val,
1551     unsigned long rra_step_cnt,
1552     unsigned long elapsed_pdp_st,
1553     unsigned long start_pdp_offset,
1554     unsigned long pdp_cnt,
1555     rrd_value_t xff,
1556     int i,
1557     int ii)
1559     /* shorthand variables */
1560     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1561     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1562     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1563     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1565     if (rra_step_cnt) {
1566         /* If we are in this block, as least 1 CDP value will be written to
1567          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1568          * to be written, then the "fill in" value is the CDP_secondary_val
1569          * entry. */
1570         if (isnan(pdp_temp_val)) {
1571             *cdp_unkn_pdp_cnt += start_pdp_offset;
1572             *cdp_secondary_val = DNAN;
1573         } else {
1574             /* CDP_secondary value is the RRA "fill in" value for intermediary
1575              * CDP data entries. No matter the CF, the value is the same because
1576              * the average, max, min, and last of a list of identical values is
1577              * the same, namely, the value itself. */
1578             *cdp_secondary_val = pdp_temp_val;
1579         }
1581         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1582             *cdp_primary_val = DNAN;
1583             if (current_cf == CF_AVERAGE) {
1584                 *cdp_val =
1585                     initialize_average_carry_over(pdp_temp_val,
1586                                                   elapsed_pdp_st,
1587                                                   start_pdp_offset, pdp_cnt);
1588             } else {
1589                 *cdp_val = pdp_temp_val;
1590             }
1591         } else {
1592             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1593                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1594         }               /* endif meets xff value requirement for a valid value */
1595         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1596          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1597         if (isnan(pdp_temp_val))
1598             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1599         else
1600             *cdp_unkn_pdp_cnt = 0;
1601     } else {            /* rra_step_cnt[i]  == 0 */
1603 #ifdef DEBUG
1604         if (isnan(*cdp_val)) {
1605             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1606                     i, ii);
1607         } else {
1608             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1609                     i, ii, *cdp_val);
1610         }
1611 #endif
1612         if (isnan(pdp_temp_val)) {
1613             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1614         } else {
1615             *cdp_val =
1616                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1617                                   current_cf, i, ii);
1618         }
1619     }
1622 /*
1623  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1624  * on the type of consolidation function.
1625  */
1626 static void initialize_cdp_val(
1627     unival *scratch,
1628     int current_cf,
1629     rrd_value_t pdp_temp_val,
1630     unsigned long elapsed_pdp_st,
1631     unsigned long start_pdp_offset,
1632     unsigned long pdp_cnt)
1634     rrd_value_t cum_val, cur_val;
1636     switch (current_cf) {
1637     case CF_AVERAGE:
1638         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1639         cur_val = IFDNAN(pdp_temp_val, 0.0);
1640         scratch[CDP_primary_val].u_val =
1641             (cum_val + cur_val * start_pdp_offset) /
1642             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1643         scratch[CDP_val].u_val =
1644             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1645                                           start_pdp_offset, pdp_cnt);
1646         break;
1647     case CF_MAXIMUM:
1648         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1649         cur_val = IFDNAN(pdp_temp_val, -DINF);
1650 #if 0
1651 #ifdef DEBUG
1652         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1653             fprintf(stderr,
1654                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1655                     i, ii);
1656             exit(-1);
1657         }
1658 #endif
1659 #endif
1660         if (cur_val > cum_val)
1661             scratch[CDP_primary_val].u_val = cur_val;
1662         else
1663             scratch[CDP_primary_val].u_val = cum_val;
1664         /* initialize carry over value */
1665         scratch[CDP_val].u_val = pdp_temp_val;
1666         break;
1667     case CF_MINIMUM:
1668         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1669         cur_val = IFDNAN(pdp_temp_val, DINF);
1670 #if 0
1671 #ifdef DEBUG
1672         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1673             fprintf(stderr,
1674                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1675                     ii);
1676             exit(-1);
1677         }
1678 #endif
1679 #endif
1680         if (cur_val < cum_val)
1681             scratch[CDP_primary_val].u_val = cur_val;
1682         else
1683             scratch[CDP_primary_val].u_val = cum_val;
1684         /* initialize carry over value */
1685         scratch[CDP_val].u_val = pdp_temp_val;
1686         break;
1687     case CF_LAST:
1688     default:
1689         scratch[CDP_primary_val].u_val = pdp_temp_val;
1690         /* initialize carry over value */
1691         scratch[CDP_val].u_val = pdp_temp_val;
1692         break;
1693     }
1696 /*
1697  * Update the consolidation function for Holt-Winters functions as
1698  * well as other functions that don't actually consolidate multiple
1699  * PDPs.
1700  */
1701 static void reset_cdp(
1702     rrd_t *rrd,
1703     unsigned long elapsed_pdp_st,
1704     rrd_value_t *pdp_temp,
1705     rrd_value_t *last_seasonal_coef,
1706     rrd_value_t *seasonal_coef,
1707     int rra_idx,
1708     int ds_idx,
1709     int cdp_idx,
1710     enum cf_en current_cf)
1712     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1714     switch (current_cf) {
1715     case CF_AVERAGE:
1716     default:
1717         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1718         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1719         break;
1720     case CF_SEASONAL:
1721     case CF_DEVSEASONAL:
1722         /* need to update cached seasonal values, so they are consistent
1723          * with the bulk update */
1724         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1725          * CDP_last_deviation are the same. */
1726         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1727         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1728         break;
1729     case CF_HWPREDICT:
1730     case CF_MHWPREDICT:
1731         /* need to update the null_count and last_null_count.
1732          * even do this for non-DNAN pdp_temp because the
1733          * algorithm is not learning from batch updates. */
1734         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1735         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1736         /* fall through */
1737     case CF_DEVPREDICT:
1738         scratch[CDP_primary_val].u_val = DNAN;
1739         scratch[CDP_secondary_val].u_val = DNAN;
1740         break;
1741     case CF_FAILURES:
1742         /* do not count missed bulk values as failures */
1743         scratch[CDP_primary_val].u_val = 0;
1744         scratch[CDP_secondary_val].u_val = 0;
1745         /* need to reset violations buffer.
1746          * could do this more carefully, but for now, just
1747          * assume a bulk update wipes away all violations. */
1748         erase_violations(rrd, cdp_idx, rra_idx);
1749         break;
1750     }
1753 static rrd_value_t initialize_average_carry_over(
1754     rrd_value_t pdp_temp_val,
1755     unsigned long elapsed_pdp_st,
1756     unsigned long start_pdp_offset,
1757     unsigned long pdp_cnt)
1759     /* initialize carry over value */
1760     if (isnan(pdp_temp_val)) {
1761         return DNAN;
1762     }
1763     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1766 /*
1767  * Update or initialize a CDP value based on the consolidation
1768  * function.
1769  *
1770  * Returns the new value.
1771  */
1772 static rrd_value_t calculate_cdp_val(
1773     rrd_value_t cdp_val,
1774     rrd_value_t pdp_temp_val,
1775     unsigned long elapsed_pdp_st,
1776     int current_cf,
1777 #ifdef DEBUG
1778     int i,
1779     int ii
1780 #else
1781     int UNUSED(i),
1782     int UNUSED(ii)
1783 #endif
1784     )
1786     if (isnan(cdp_val)) {
1787         if (current_cf == CF_AVERAGE) {
1788             pdp_temp_val *= elapsed_pdp_st;
1789         }
1790 #ifdef DEBUG
1791         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1792                 i, ii, pdp_temp_val);
1793 #endif
1794         return pdp_temp_val;
1795     }
1796     if (current_cf == CF_AVERAGE)
1797         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1798     if (current_cf == CF_MINIMUM)
1799         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1800     if (current_cf == CF_MAXIMUM)
1801         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1803     return pdp_temp_val;
1806 /*
1807  * For each RRA, update the seasonal values and then call update_aberrant_CF
1808  * for each data source.
1809  *
1810  * Return 0 on success, -1 on error.
1811  */
1812 static int update_aberrant_cdps(
1813     rrd_t *rrd,
1814     rrd_file_t *rrd_file,
1815     unsigned long rra_begin,
1816     unsigned long elapsed_pdp_st,
1817     rrd_value_t *pdp_temp,
1818     rrd_value_t **seasonal_coef)
1820     unsigned long rra_idx, ds_idx, j;
1822     /* number of PDP steps since the last update that
1823      * are assigned to the first CDP to be generated
1824      * since the last update. */
1825     unsigned short scratch_idx;
1826     unsigned long rra_start;
1827     enum cf_en current_cf;
1829     /* this loop is only entered if elapsed_pdp_st < 3 */
1830     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1831          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1832         rra_start = rra_begin;
1833         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1834             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1835                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1836                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1837                     if (scratch_idx == CDP_primary_val) {
1838                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1839                                         elapsed_pdp_st + 1, seasonal_coef);
1840                     } else {
1841                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1842                                         elapsed_pdp_st + 2, seasonal_coef);
1843                     }
1844                 }
1845                 if (rrd_test_error())
1846                     return -1;
1847                 /* loop over data soures within each RRA */
1848                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1849                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1850                                        rra_idx * (rrd->stat_head->ds_cnt) +
1851                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1852                                        *seasonal_coef);
1853                 }
1854             }
1855             rra_start += rrd->rra_def[rra_idx].row_cnt
1856                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1857         }
1858     }
1859     return 0;
1862 /* 
1863  * Move sequentially through the file, writing one RRA at a time.  Note this
1864  * architecture divorces the computation of CDP with flushing updated RRA
1865  * entries to disk.
1866  *
1867  * Return 0 on success, -1 on error.
1868  */
1869 static int write_to_rras(
1870     rrd_t *rrd,
1871     rrd_file_t *rrd_file,
1872     unsigned long *rra_step_cnt,
1873     unsigned long rra_begin,
1874     time_t current_time,
1875     unsigned long *skip_update,
1876     rrd_info_t ** pcdp_summary)
1878     unsigned long rra_idx;
1879     unsigned long rra_start;
1880     time_t    rra_time = 0; /* time of update for a RRA */
1882     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1883     
1884     /* Ready to write to disk */
1885     rra_start = rra_begin;
1887     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1888         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1889         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1891         /* for cdp_prep */
1892         unsigned short scratch_idx;
1893         unsigned long step_subtract;
1895         for (scratch_idx = CDP_primary_val,
1896                  step_subtract = 1;
1897              rra_step_cnt[rra_idx] > 0;
1898              rra_step_cnt[rra_idx]--,
1899                  scratch_idx = CDP_secondary_val,
1900                  step_subtract = 2) {
1902             size_t rra_pos_new;
1903 #ifdef DEBUG
1904             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1905 #endif
1906             /* increment, with wrap-around */
1907             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1908               rra_ptr->cur_row = 0;
1910             /* we know what our position should be */
1911             rra_pos_new = rra_start
1912               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1914             /* re-seek if the position is wrong or we wrapped around */
1915             if (rra_pos_new != rrd_file->pos) {
1916                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1917                     rrd_set_error("seek error in rrd");
1918                     return -1;
1919                 }
1920             }
1921 #ifdef DEBUG
1922             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1923 #endif
1925             if (skip_update[rra_idx])
1926                 continue;
1928             if (*pcdp_summary != NULL) {
1929                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1931                 rra_time = (current_time - current_time % step_time)
1932                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1933             }
1935             if (write_RRA_row
1936                 (rrd_file, rrd, rra_idx, scratch_idx,
1937                  pcdp_summary, rra_time) == -1)
1938                 return -1;
1939         }
1941         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1942     } /* RRA LOOP */
1944     return 0;
1947 /*
1948  * Write out one row of values (one value per DS) to the archive.
1949  *
1950  * Returns 0 on success, -1 on error.
1951  */
1952 static int write_RRA_row(
1953     rrd_file_t *rrd_file,
1954     rrd_t *rrd,
1955     unsigned long rra_idx,
1956     unsigned short CDP_scratch_idx,
1957     rrd_info_t ** pcdp_summary,
1958     time_t rra_time)
1960     unsigned long ds_idx, cdp_idx;
1961     rrd_infoval_t iv;
1963     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1964         /* compute the cdp index */
1965         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1966 #ifdef DEBUG
1967         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1968                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1969                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1970 #endif
1971         if (*pcdp_summary != NULL) {
1972             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1973             /* append info to the return hash */
1974             *pcdp_summary = rrd_info_push(*pcdp_summary,
1975                                           sprintf_alloc
1976                                           ("[%lli]RRA[%s][%lu]DS[%s]", (long long)rra_time,
1977                                            rrd->rra_def[rra_idx].cf_nam,
1978                                            rrd->rra_def[rra_idx].pdp_cnt,
1979                                            rrd->ds_def[ds_idx].ds_nam),
1980                                           RD_I_VAL, iv);
1981         }
1982         errno = 0;
1983         if (rrd_write(rrd_file,
1984                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
1985                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
1986             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
1987             return -1;
1988         }
1989     }
1990     return 0;
1993 /*
1994  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
1995  *
1996  * Returns 0 on success, -1 otherwise
1997  */
1998 static int smooth_all_rras(
1999     rrd_t *rrd,
2000     rrd_file_t *rrd_file,
2001     unsigned long rra_begin)
2003     unsigned long rra_start = rra_begin;
2004     unsigned long rra_idx;
2006     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2007         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2008             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2009 #ifdef DEBUG
2010             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2011 #endif
2012             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2013             if (rrd_test_error())
2014                 return -1;
2015         }
2016         rra_start += rrd->rra_def[rra_idx].row_cnt
2017             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2018     }
2019     return 0;
2022 #ifndef HAVE_MMAP
2023 /*
2024  * Flush changes to disk (unless we're using mmap)
2025  *
2026  * Returns 0 on success, -1 otherwise
2027  */
2028 static int write_changes_to_disk(
2029     rrd_t *rrd,
2030     rrd_file_t *rrd_file,
2031     int version)
2033     /* we just need to write back the live header portion now */
2034     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2035                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2036                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2037                  SEEK_SET) != 0) {
2038         rrd_set_error("seek rrd for live header writeback");
2039         return -1;
2040     }
2041     if (version >= 3) {
2042         if (rrd_write(rrd_file, rrd->live_head,
2043                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2044             rrd_set_error("rrd_write live_head to rrd");
2045             return -1;
2046         }
2047     } else {
2048         if (rrd_write(rrd_file, rrd->legacy_last_up,
2049                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2050             rrd_set_error("rrd_write live_head to rrd");
2051             return -1;
2052         }
2053     }
2056     if (rrd_write(rrd_file, rrd->pdp_prep,
2057                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2058         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2059         rrd_set_error("rrd_write pdp_prep to rrd");
2060         return -1;
2061     }
2063     if (rrd_write(rrd_file, rrd->cdp_prep,
2064                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2065                   rrd->stat_head->ds_cnt)
2066         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2067                       rrd->stat_head->ds_cnt)) {
2069         rrd_set_error("rrd_write cdp_prep to rrd");
2070         return -1;
2071     }
2073     if (rrd_write(rrd_file, rrd->rra_ptr,
2074                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2075         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2076         rrd_set_error("rrd_write rra_ptr to rrd");
2077         return -1;
2078     }
2079     return 0;
2081 #endif