Code

control: Updated standards-version to 3.9.5 -- no changes.
[pkg-rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.4.8  Copyright by Tobi Oetiker, 1997-2013
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
52     struct _timeb current_time;
54     _ftime(&current_time);
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
59     return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long start_pdp_offset,
216     unsigned long pdp_cnt);
218 static void reset_cdp(
219     rrd_t *rrd,
220     unsigned long elapsed_pdp_st,
221     rrd_value_t *pdp_temp,
222     rrd_value_t *last_seasonal_coef,
223     rrd_value_t *seasonal_coef,
224     int rra_idx,
225     int ds_idx,
226     int cdp_idx,
227     enum cf_en current_cf);
229 static rrd_value_t initialize_carry_over(
230     rrd_value_t pdp_temp_val,
231     int         current_cf,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static void normalize_time(
286     struct timeval *t)
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
303     struct timeval tmp_time;    /* used for time conversion */
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     char *opt_daemon = NULL;
325     struct option long_options[] = {
326         {"template", required_argument, 0, 't'},
327         {0, 0, 0, 0}
328     };
330     rc.u_int = -1;
331     optind = 0;
332     opterr = 0;         /* initialize getopt */
334     while (1) {
335         int       option_index = 0;
336         int       opt;
338         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
340         if (opt == EOF)
341             break;
343         switch (opt) {
344         case 't':
345             tmplt = optarg;
346             break;
348         case '?':
349             rrd_set_error("unknown option '%s'", argv[optind - 1]);
350             goto end_tag;
351         }
352     }
354     opt_daemon = getenv (ENV_RRDCACHED_ADDRESS);
355     if (opt_daemon != NULL) {
356         rrd_set_error ("The \"%s\" environment variable is defined, "
357                 "but \"%s\" cannot work with rrdcached. Either unset "
358                 "the environment variable or use \"update\" instead.",
359                 ENV_RRDCACHED_ADDRESS, argv[0]);
360         goto end_tag;
361     }
363     /* need at least 2 arguments: filename, data. */
364     if (argc - optind < 2) {
365         rrd_set_error("Not enough arguments");
366         goto end_tag;
367     }
368     rc.u_int = 0;
369     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
370     rc.u_int = _rrd_update(argv[optind], tmplt,
371                            argc - optind - 1,
372                            (const char **) (argv + optind + 1), result);
373     result->value.u_int = rc.u_int;
374   end_tag:
375     return result;
378 int rrd_update(
379     int argc,
380     char **argv)
382     struct option long_options[] = {
383         {"template", required_argument, 0, 't'},
384         {"daemon",   required_argument, 0, 'd'},
385         {0, 0, 0, 0}
386     };
387     int       option_index = 0;
388     int       opt;
389     char     *tmplt = NULL;
390     int       rc = -1;
391     char     *opt_daemon = NULL;
393     optind = 0;
394     opterr = 0;         /* initialize getopt */
396     while (1) {
397         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
399         if (opt == EOF)
400             break;
402         switch (opt) {
403         case 't':
404             tmplt = strdup(optarg);
405             break;
407         case 'd':
408             if (opt_daemon != NULL)
409                 free (opt_daemon);
410             opt_daemon = strdup (optarg);
411             if (opt_daemon == NULL)
412             {
413                 rrd_set_error("strdup failed.");
414                 goto out;
415             }
416             break;
418         case '?':
419             rrd_set_error("unknown option '%s'", argv[optind - 1]);
420             goto out;
421         }
422     }
424     /* need at least 2 arguments: filename, data. */
425     if (argc - optind < 2) {
426         rrd_set_error("Not enough arguments");
427         goto out;
428     }
430     {   /* try to connect to rrdcached */
431         int status = rrdc_connect(opt_daemon);
432         if (status != 0) {
433              rc = status;
434              goto out;
435         }
436     }
438     if ((tmplt != NULL) && rrdc_is_connected(opt_daemon))
439     {
440         rrd_set_error("The caching daemon cannot be used together with "
441                 "templates yet.");
442         goto out;
443     }
445     if (! rrdc_is_connected(opt_daemon))
446     {
447       rc = rrd_update_r(argv[optind], tmplt,
448                         argc - optind - 1, (const char **) (argv + optind + 1));
449     }
450     else /* we are connected */
451     {
452         rc = rrdc_update (argv[optind], /* file */
453                           argc - optind - 1, /* values_num */
454                           (const char *const *) (argv + optind + 1)); /* values */
455         if (rc > 0)
456             rrd_set_error("Failed sending the values to rrdcached: %s",
457                           rrd_strerror (rc));
458     }
460   out:
461     if (tmplt != NULL)
462     {
463         free(tmplt);
464         tmplt = NULL;
465     }
466     if (opt_daemon != NULL)
467     {
468         free (opt_daemon);
469         opt_daemon = NULL;
470     }
471     return rc;
474 int rrd_update_r(
475     const char *filename,
476     const char *tmplt,
477     int argc,
478     const char **argv)
480     return _rrd_update(filename, tmplt, argc, argv, NULL);
483 int _rrd_update(
484     const char *filename,
485     const char *tmplt,
486     int argc,
487     const char **argv,
488     rrd_info_t * pcdp_summary)
491     int       arg_i = 2;
493     unsigned long rra_begin;    /* byte pointer to the rra
494                                  * area in the rrd file.  this
495                                  * pointer never changes value */
496     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
497                              * to the existing entry */
498     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
499                              * to the cdp values */
501     long     *tmpl_idx; /* index representing the settings
502                          * transported by the tmplt index */
503     unsigned long tmpl_cnt = 2; /* time and data */
504     rrd_t     rrd;
505     time_t    current_time = 0;
506     unsigned long current_time_usec = 0;    /* microseconds part of current time */
507     char    **updvals;
508     int       schedule_smooth = 0;
510     /* number of elapsed PDP steps since last update */
511     unsigned long *rra_step_cnt = NULL;
513     int       version;  /* rrd version */
514     rrd_file_t *rrd_file;
515     char     *arg_copy; /* for processing the argv */
516     unsigned long *skip_update; /* RRAs to advance but not write */
518     /* need at least 1 arguments: data. */
519     if (argc < 1) {
520         rrd_set_error("Not enough arguments");
521         goto err_out;
522     }
524     rrd_init(&rrd);
525     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
526         goto err_free;
527     }
528     /* We are now at the beginning of the rra's */
529     rra_begin = rrd_file->header_len;
531     version = atoi(rrd.stat_head->version);
533     initialize_time(&current_time, &current_time_usec, version);
535     /* get exclusive lock to whole file.
536      * lock gets removed when we close the file.
537      */
538     if (rrd_lock(rrd_file) != 0) {
539         rrd_set_error("could not lock RRD");
540         goto err_close;
541     }
543     if (allocate_data_structures(&rrd, &updvals,
544                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
545                                  &rra_step_cnt, &skip_update,
546                                  &pdp_new) == -1) {
547         goto err_close;
548     }
550     /* loop through the arguments. */
551     for (arg_i = 0; arg_i < argc; arg_i++) {
552         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
553             rrd_set_error("failed duplication argv entry");
554             break;
555         }
556         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
557                         &current_time, &current_time_usec, pdp_temp, pdp_new,
558                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
559                         &pcdp_summary, version, skip_update,
560                         &schedule_smooth) == -1) {
561             if (rrd_test_error()) { /* Should have error string always here */
562                 char     *save_error;
564                 /* Prepend file name to error message */
565                 if ((save_error = strdup(rrd_get_error())) != NULL) {
566                     rrd_set_error("%s: %s", filename, save_error);
567                     free(save_error);
568                 }
569             }
570             free(arg_copy);
571             break;
572         }
573         free(arg_copy);
574     }
576     free(rra_step_cnt);
578     /* if we got here and if there is an error and if the file has not been
579      * written to, then close things up and return. */
580     if (rrd_test_error()) {
581         goto err_free_structures;
582     }
583 #ifndef HAVE_MMAP
584     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
585         goto err_free_structures;
586     }
587 #endif
589     /* calling the smoothing code here guarantees at most one smoothing
590      * operation per rrd_update call. Unfortunately, it is possible with bulk
591      * updates, or a long-delayed update for smoothing to occur off-schedule.
592      * This really isn't critical except during the burn-in cycles. */
593     if (schedule_smooth) {
594         smooth_all_rras(&rrd, rrd_file, rra_begin);
595     }
597 /*    rrd_dontneed(rrd_file,&rrd); */
598     rrd_free(&rrd);
599     rrd_close(rrd_file);
601     free(pdp_new);
602     free(tmpl_idx);
603     free(pdp_temp);
604     free(skip_update);
605     free(updvals);
606     return 0;
608   err_free_structures:
609     free(pdp_new);
610     free(tmpl_idx);
611     free(pdp_temp);
612     free(skip_update);
613     free(updvals);
614   err_close:
615     rrd_close(rrd_file);
616   err_free:
617     rrd_free(&rrd);
618   err_out:
619     return -1;
622 /*
623  * Allocate some important arrays used, and initialize the template.
624  *
625  * When it returns, either all of the structures are allocated
626  * or none of them are.
627  *
628  * Returns 0 on success, -1 on error.
629  */
630 static int allocate_data_structures(
631     rrd_t *rrd,
632     char ***updvals,
633     rrd_value_t **pdp_temp,
634     const char *tmplt,
635     long **tmpl_idx,
636     unsigned long *tmpl_cnt,
637     unsigned long **rra_step_cnt,
638     unsigned long **skip_update,
639     rrd_value_t **pdp_new)
641     unsigned  i, ii;
642     if ((*updvals = (char **) malloc(sizeof(char *)
643                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
644         rrd_set_error("allocating updvals pointer array.");
645         return -1;
646     }
647     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
648                                             * rrd->stat_head->ds_cnt)) ==
649         NULL) {
650         rrd_set_error("allocating pdp_temp.");
651         goto err_free_updvals;
652     }
653     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
654                                                  *
655                                                  rrd->stat_head->rra_cnt)) ==
656         NULL) {
657         rrd_set_error("allocating skip_update.");
658         goto err_free_pdp_temp;
659     }
660     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
661                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
662         rrd_set_error("allocating tmpl_idx.");
663         goto err_free_skip_update;
664     }
665     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
666                                                   *
667                                                   (rrd->stat_head->
668                                                    rra_cnt))) == NULL) {
669         rrd_set_error("allocating rra_step_cnt.");
670         goto err_free_tmpl_idx;
671     }
673     /* initialize tmplt redirector */
674     /* default config example (assume DS 1 is a CDEF DS)
675        tmpl_idx[0] -> 0; (time)
676        tmpl_idx[1] -> 1; (DS 0)
677        tmpl_idx[2] -> 3; (DS 2)
678        tmpl_idx[3] -> 4; (DS 3) */
679     (*tmpl_idx)[0] = 0; /* time */
680     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
681         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
682             (*tmpl_idx)[ii++] = i;
683     }
684     *tmpl_cnt = ii;
686     if (tmplt != NULL) {
687         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
688             goto err_free_rra_step_cnt;
689         }
690     }
692     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
693                                            * rrd->stat_head->ds_cnt)) == NULL) {
694         rrd_set_error("allocating pdp_new.");
695         goto err_free_rra_step_cnt;
696     }
698     return 0;
700   err_free_rra_step_cnt:
701     free(*rra_step_cnt);
702   err_free_tmpl_idx:
703     free(*tmpl_idx);
704   err_free_skip_update:
705     free(*skip_update);
706   err_free_pdp_temp:
707     free(*pdp_temp);
708   err_free_updvals:
709     free(*updvals);
710     return -1;
713 /*
714  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
715  *
716  * Returns 0 on success.
717  */
718 static int parse_template(
719     rrd_t *rrd,
720     const char *tmplt,
721     unsigned long *tmpl_cnt,
722     long *tmpl_idx)
724     char     *dsname, *tmplt_copy;
725     unsigned int tmpl_len, i;
726     int       ret = 0;
728     *tmpl_cnt = 1;      /* the first entry is the time */
730     /* we should work on a writeable copy here */
731     if ((tmplt_copy = strdup(tmplt)) == NULL) {
732         rrd_set_error("error copying tmplt '%s'", tmplt);
733         ret = -1;
734         goto out;
735     }
737     dsname = tmplt_copy;
738     tmpl_len = strlen(tmplt_copy);
739     for (i = 0; i <= tmpl_len; i++) {
740         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
741             tmplt_copy[i] = '\0';
742             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
743                 rrd_set_error("tmplt contains more DS definitions than RRD");
744                 ret = -1;
745                 goto out_free_tmpl_copy;
746             }
747             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
748                 rrd_set_error("unknown DS name '%s'", dsname);
749                 ret = -1;
750                 goto out_free_tmpl_copy;
751             }
752             /* go to the next entry on the tmplt_copy */
753             if (i < tmpl_len)
754                 dsname = &tmplt_copy[i + 1];
755         }
756     }
757   out_free_tmpl_copy:
758     free(tmplt_copy);
759   out:
760     return ret;
763 /*
764  * Parse an update string, updates the primary data points (PDPs)
765  * and consolidated data points (CDPs), and writes changes to the RRAs.
766  *
767  * Returns 0 on success, -1 on error.
768  */
769 static int process_arg(
770     char *step_start,
771     rrd_t *rrd,
772     rrd_file_t *rrd_file,
773     unsigned long rra_begin,
774     time_t *current_time,
775     unsigned long *current_time_usec,
776     rrd_value_t *pdp_temp,
777     rrd_value_t *pdp_new,
778     unsigned long *rra_step_cnt,
779     char **updvals,
780     long *tmpl_idx,
781     unsigned long tmpl_cnt,
782     rrd_info_t ** pcdp_summary,
783     int version,
784     unsigned long *skip_update,
785     int *schedule_smooth)
787     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
789     /* a vector of future Holt-Winters seasonal coefs */
790     unsigned long elapsed_pdp_st;
792     double    interval, pre_int, post_int;  /* interval between this and
793                                              * the last run */
794     unsigned long proc_pdp_cnt;
796     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
797                  current_time, current_time_usec, version) == -1) {
798         return -1;
799     }
801     interval = (double) (*current_time - rrd->live_head->last_up)
802         + (double) ((long) *current_time_usec -
803                     (long) rrd->live_head->last_up_usec) / 1e6f;
805     /* process the data sources and update the pdp_prep 
806      * area accordingly */
807     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
808         return -1;
809     }
811     elapsed_pdp_st = calculate_elapsed_steps(rrd,
812                                              *current_time,
813                                              *current_time_usec, interval,
814                                              &pre_int, &post_int,
815                                              &proc_pdp_cnt);
817     /* has a pdp_st moment occurred since the last run ? */
818     if (elapsed_pdp_st == 0) {
819         /* no we have not passed a pdp_st moment. therefore update is simple */
820         simple_update(rrd, interval, pdp_new);
821     } else {
822         /* an pdp_st has occurred. */
823         if (process_all_pdp_st(rrd, interval,
824                                pre_int, post_int,
825                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
826             return -1;
827         }
828         if (update_all_cdp_prep(rrd, rra_step_cnt,
829                                 rra_begin, rrd_file,
830                                 elapsed_pdp_st,
831                                 proc_pdp_cnt,
832                                 &last_seasonal_coef,
833                                 &seasonal_coef,
834                                 pdp_temp,
835                                 skip_update, schedule_smooth) == -1) {
836             goto err_free_coefficients;
837         }
838         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
839                                  elapsed_pdp_st, pdp_temp,
840                                  &seasonal_coef) == -1) {
841             goto err_free_coefficients;
842         }
843         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
844                           *current_time, skip_update,
845                           pcdp_summary) == -1) {
846             goto err_free_coefficients;
847         }
848     }                   /* endif a pdp_st has occurred */
849     rrd->live_head->last_up = *current_time;
850     rrd->live_head->last_up_usec = *current_time_usec;
852     if (version < 3) {
853         *rrd->legacy_last_up = rrd->live_head->last_up;
854     }
855     free(seasonal_coef);
856     free(last_seasonal_coef);
857     return 0;
859   err_free_coefficients:
860     free(seasonal_coef);
861     free(last_seasonal_coef);
862     return -1;
865 /*
866  * Parse a DS string (time + colon-separated values), storing the
867  * results in current_time, current_time_usec, and updvals.
868  *
869  * Returns 0 on success, -1 on error.
870  */
871 static int parse_ds(
872     rrd_t *rrd,
873     char **updvals,
874     long *tmpl_idx,
875     char *input,
876     unsigned long tmpl_cnt,
877     time_t *current_time,
878     unsigned long *current_time_usec,
879     int version)
881     char     *p;
882     unsigned long i;
883     char      timesyntax;
885     updvals[0] = input;
886     /* initialize all ds input to unknown except the first one
887        which has always got to be set */
888     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
889         updvals[i] = "U";
891     /* separate all ds elements; first must be examined separately
892        due to alternate time syntax */
893     if ((p = strchr(input, '@')) != NULL) {
894         timesyntax = '@';
895     } else if ((p = strchr(input, ':')) != NULL) {
896         timesyntax = ':';
897     } else {
898         rrd_set_error("expected timestamp not found in data source from %s",
899                       input);
900         return -1;
901     }
902     *p = '\0';
903     i = 1;
904     updvals[tmpl_idx[i++]] = p + 1;
905     while (*(++p)) {
906         if (*p == ':') {
907             *p = '\0';
908             if (i < tmpl_cnt) {
909                 updvals[tmpl_idx[i++]] = p + 1;
910             }
911             else {
912                 rrd_set_error("found extra data on update argument: %s",p+1);
913                 return -1;
914             }                
915         }
916     }
918     if (i != tmpl_cnt) {
919         rrd_set_error("expected %lu data source readings (got %lu) from %s",
920                       tmpl_cnt - 1, i - 1, input);
921         return -1;
922     }
924     if (get_time_from_reading(rrd, timesyntax, updvals,
925                               current_time, current_time_usec,
926                               version) == -1) {
927         return -1;
928     }
929     return 0;
932 /*
933  * Parse the time in a DS string, store it in current_time and 
934  * current_time_usec and verify that it's later than the last
935  * update for this DS.
936  *
937  * Returns 0 on success, -1 on error.
938  */
939 static int get_time_from_reading(
940     rrd_t *rrd,
941     char timesyntax,
942     char **updvals,
943     time_t *current_time,
944     unsigned long *current_time_usec,
945     int version)
947     double    tmp;
948     char     *parsetime_error = NULL;
949     char     *old_locale;
950     rrd_time_value_t ds_tv;
951     struct timeval tmp_time;    /* used for time conversion */
953     /* get the time from the reading ... handle N */
954     if (timesyntax == '@') {    /* at-style */
955         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
956             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
957             return -1;
958         }
959         if (ds_tv.type == RELATIVE_TO_END_TIME ||
960             ds_tv.type == RELATIVE_TO_START_TIME) {
961             rrd_set_error("specifying time relative to the 'start' "
962                           "or 'end' makes no sense here: %s", updvals[0]);
963             return -1;
964         }
965         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
966         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
967     } else if (strcmp(updvals[0], "N") == 0) {
968         gettimeofday(&tmp_time, 0);
969         normalize_time(&tmp_time);
970         *current_time = tmp_time.tv_sec;
971         *current_time_usec = tmp_time.tv_usec;
972     } else {
973         old_locale = setlocale(LC_NUMERIC, NULL);
974         setlocale(LC_NUMERIC, "C");
975         errno = 0;
976         tmp = strtod(updvals[0], 0);
977         if (errno > 0) {
978             rrd_set_error("converting '%s' to float: %s",
979                 updvals[0], rrd_strerror(errno));
980             return -1;
981         };
982         setlocale(LC_NUMERIC, old_locale);
983         if (tmp < 0.0){
984             gettimeofday(&tmp_time, 0);
985             tmp = (double)tmp_time.tv_sec + (double)tmp_time.tv_usec * 1e-6f + tmp;
986         }
988         *current_time = floor(tmp);
989         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
990     }
991     /* dont do any correction for old version RRDs */
992     if (version < 3)
993         *current_time_usec = 0;
995     if (*current_time < rrd->live_head->last_up ||
996         (*current_time == rrd->live_head->last_up &&
997          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
998         rrd_set_error("illegal attempt to update using time %ld when "
999                       "last update time is %ld (minimum one second step)",
1000                       *current_time, rrd->live_head->last_up);
1001         return -1;
1002     }
1003     return 0;
1006 /*
1007  * Update pdp_new by interpreting the updvals according to the DS type
1008  * (COUNTER, GAUGE, etc.).
1009  *
1010  * Returns 0 on success, -1 on error.
1011  */
1012 static int update_pdp_prep(
1013     rrd_t *rrd,
1014     char **updvals,
1015     rrd_value_t *pdp_new,
1016     double interval)
1018     unsigned long ds_idx;
1019     int       ii;
1020     char     *endptr;   /* used in the conversion */
1021     double    rate;
1022     char     *old_locale;
1023     enum dst_en dst_idx;
1025     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1026         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1028         /* make sure we do not build diffs with old last_ds values */
1029         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1030             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1031             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1032         }
1034         /* NOTE: DST_CDEF should never enter this if block, because
1035          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1036          * accidently specified a value for the DST_CDEF. To handle this case,
1037          * an extra check is required. */
1039         if ((updvals[ds_idx + 1][0] != 'U') &&
1040             (dst_idx != DST_CDEF) &&
1041             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1042             rate = DNAN;
1044             /* pdp_new contains rate * time ... eg the bytes transferred during
1045              * the interval. Doing it this way saves a lot of math operations
1046              */
1047             switch (dst_idx) {
1048             case DST_COUNTER:
1049             case DST_DERIVE:
1050                 /* Check if this is a valid integer. `U' is already handled in
1051                  * another branch. */
1052                 for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) {
1053                     if ((ii == 0) && (dst_idx == DST_DERIVE)
1054                             && (updvals[ds_idx + 1][ii] == '-'))
1055                         continue;
1057                     if ((updvals[ds_idx + 1][ii] < '0')
1058                             || (updvals[ds_idx + 1][ii] > '9')) {
1059                         rrd_set_error("not a simple %s integer: '%s'",
1060                                 (dst_idx == DST_DERIVE) ? "signed" : "unsigned",
1061                                 updvals[ds_idx + 1]);
1062                         return -1;
1063                     }
1064                 } /* for (ii = 0; updvals[ds_idx + 1][ii] != 0; ii++) */
1066                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1067                     pdp_new[ds_idx] =
1068                         rrd_diff(updvals[ds_idx + 1],
1069                                  rrd->pdp_prep[ds_idx].last_ds);
1070                     if (dst_idx == DST_COUNTER) {
1071                         /* simple overflow catcher. This will fail
1072                          * terribly for non 32 or 64 bit counters
1073                          * ... are there any others in SNMP land?
1074                          */
1075                         if (pdp_new[ds_idx] < (double) 0.0)
1076                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1077                         if (pdp_new[ds_idx] < (double) 0.0)
1078                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1079                     }
1080                     rate = pdp_new[ds_idx] / interval;
1081                 } else {
1082                     pdp_new[ds_idx] = DNAN;
1083                 }
1084                 break;
1085             case DST_ABSOLUTE:
1086                 old_locale = setlocale(LC_NUMERIC, NULL);
1087                 setlocale(LC_NUMERIC, "C");
1088                 errno = 0;
1089                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1090                 if (errno > 0) {
1091                     rrd_set_error("converting '%s' to float: %s",
1092                                   updvals[ds_idx + 1], rrd_strerror(errno));
1093                     return -1;
1094                 };
1095                 setlocale(LC_NUMERIC, old_locale);
1096                 if (endptr[0] != '\0') {
1097                     rrd_set_error
1098                         ("conversion of '%s' to float not complete: tail '%s'",
1099                          updvals[ds_idx + 1], endptr);
1100                     return -1;
1101                 }
1102                 rate = pdp_new[ds_idx] / interval;
1103                 break;
1104             case DST_GAUGE:
1105                 old_locale = setlocale(LC_NUMERIC, NULL);
1106                 setlocale(LC_NUMERIC, "C");
1107                 errno = 0;
1108                 pdp_new[ds_idx] =
1109                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1110                 if (errno) {
1111                     rrd_set_error("converting '%s' to float: %s",
1112                                   updvals[ds_idx + 1], rrd_strerror(errno));
1113                     return -1;
1114                 };
1115                 setlocale(LC_NUMERIC, old_locale);
1116                 if (endptr[0] != '\0') {
1117                     rrd_set_error
1118                         ("conversion of '%s' to float not complete: tail '%s'",
1119                          updvals[ds_idx + 1], endptr);
1120                     return -1;
1121                 }
1122                 rate = pdp_new[ds_idx] / interval;
1123                 break;
1124             default:
1125                 rrd_set_error("rrd contains unknown DS type : '%s'",
1126                               rrd->ds_def[ds_idx].dst);
1127                 return -1;
1128             }
1129             /* break out of this for loop if the error string is set */
1130             if (rrd_test_error()) {
1131                 return -1;
1132             }
1133             /* make sure pdp_temp is neither too large or too small
1134              * if any of these occur it becomes unknown ...
1135              * sorry folks ... */
1136             if (!isnan(rate) &&
1137                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1138                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1139                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1140                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1141                 pdp_new[ds_idx] = DNAN;
1142             }
1143         } else {
1144             /* no news is news all the same */
1145             pdp_new[ds_idx] = DNAN;
1146         }
1149         /* make a copy of the command line argument for the next run */
1150 #ifdef DEBUG
1151         fprintf(stderr, "prep ds[%lu]\t"
1152                 "last_arg '%s'\t"
1153                 "this_arg '%s'\t"
1154                 "pdp_new %10.2f\n",
1155                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1156                 pdp_new[ds_idx]);
1157 #endif
1158         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1159                 LAST_DS_LEN - 1);
1160         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1161     }
1162     return 0;
1165 /*
1166  * How many PDP steps have elapsed since the last update? Returns the answer,
1167  * and stores the time between the last update and the last PDP in pre_time,
1168  * and the time between the last PDP and the current time in post_int.
1169  */
1170 static int calculate_elapsed_steps(
1171     rrd_t *rrd,
1172     unsigned long current_time,
1173     unsigned long current_time_usec,
1174     double interval,
1175     double *pre_int,
1176     double *post_int,
1177     unsigned long *proc_pdp_cnt)
1179     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1180     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1181                                  * time */
1182     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1183                                  * when it was last updated */
1184     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1186     /* when was the current pdp started */
1187     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1188     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1190     /* when did the last pdp_st occur */
1191     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1192     occu_pdp_st = current_time - occu_pdp_age;
1194     if (occu_pdp_st > proc_pdp_st) {
1195         /* OK we passed the pdp_st moment */
1196         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1197                                                                      * occurred before the latest
1198                                                                      * pdp_st moment*/
1199         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1200         *post_int = occu_pdp_age;   /* how much after it */
1201         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1202     } else {
1203         *pre_int = interval;
1204         *post_int = 0;
1205     }
1207     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1209 #ifdef DEBUG
1210     printf("proc_pdp_age %lu\t"
1211            "proc_pdp_st %lu\t"
1212            "occu_pfp_age %lu\t"
1213            "occu_pdp_st %lu\t"
1214            "int %lf\t"
1215            "pre_int %lf\t"
1216            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1217            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1218 #endif
1220     /* compute the number of elapsed pdp_st moments */
1221     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1224 /*
1225  * Increment the PDP values by the values in pdp_new, or else initialize them.
1226  */
1227 static void simple_update(
1228     rrd_t *rrd,
1229     double interval,
1230     rrd_value_t *pdp_new)
1232     int       i;
1234     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1235         if (isnan(pdp_new[i])) {
1236             /* this is not really accurate if we use subsecond data arrival time
1237                should have thought of it when going subsecond resolution ...
1238                sorry next format change we will have it! */
1239             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1240                 floor(interval);
1241         } else {
1242             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1243                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1244             } else {
1245                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1246             }
1247         }
1248 #ifdef DEBUG
1249         fprintf(stderr,
1250                 "NO PDP  ds[%i]\t"
1251                 "value %10.2f\t"
1252                 "unkn_sec %5lu\n",
1253                 i,
1254                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1255                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1256 #endif
1257     }
1260 /*
1261  * Call process_pdp_st for each DS.
1262  *
1263  * Returns 0 on success, -1 on error.
1264  */
1265 static int process_all_pdp_st(
1266     rrd_t *rrd,
1267     double interval,
1268     double pre_int,
1269     double post_int,
1270     unsigned long elapsed_pdp_st,
1271     rrd_value_t *pdp_new,
1272     rrd_value_t *pdp_temp)
1274     unsigned long ds_idx;
1276     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1277        rate*seconds which occurred up to the last run.
1278        pdp_new[] contains rate*seconds from the latest run.
1279        pdp_temp[] will contain the rate for cdp */
1281     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1282         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1283                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1284                            pdp_new, pdp_temp) == -1) {
1285             return -1;
1286         }
1287 #ifdef DEBUG
1288         fprintf(stderr, "PDP UPD ds[%lu]\t"
1289                 "elapsed_pdp_st %lu\t"
1290                 "pdp_temp %10.2f\t"
1291                 "new_prep %10.2f\t"
1292                 "new_unkn_sec %5lu\n",
1293                 ds_idx,
1294                 elapsed_pdp_st,
1295                 pdp_temp[ds_idx],
1296                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1297                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1298 #endif
1299     }
1300     return 0;
1303 /*
1304  * Process an update that occurs after one of the PDP moments.
1305  * Increments the PDP value, sets NAN if time greater than the
1306  * heartbeats have elapsed, processes CDEFs.
1307  *
1308  * Returns 0 on success, -1 on error.
1309  */
1310 static int process_pdp_st(
1311     rrd_t *rrd,
1312     unsigned long ds_idx,
1313     double interval,
1314     double pre_int,
1315     double post_int,
1316     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1317     rrd_value_t *pdp_new,
1318     rrd_value_t *pdp_temp)
1320     int       i;
1322     /* update pdp_prep to the current pdp_st. */
1323     double    pre_unknown = 0.0;
1324     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1325     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1327     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1329     rpnstack_init(&rpnstack);
1332     if (isnan(pdp_new[ds_idx])) {
1333         /* a final bit of unknown to be added before calculation
1334            we use a temporary variable for this so that we
1335            don't have to turn integer lines before using the value */
1336         pre_unknown = pre_int;
1337     } else {
1338         if (isnan(scratch[PDP_val].u_val)) {
1339             scratch[PDP_val].u_val = 0;
1340         }
1341         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1342     }
1344     /* if too much of the pdp_prep is unknown we dump it */
1345     /* if the interval is larger thatn mrhb we get NAN */
1346     if ((interval > mrhb) ||
1347         (rrd->stat_head->pdp_step / 2.0 <
1348          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1349         pdp_temp[ds_idx] = DNAN;
1350     } else {
1351         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1352             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1353              pre_unknown);
1354     }
1356     /* process CDEF data sources; remember each CDEF DS can
1357      * only reference other DS with a lower index number */
1358     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1359         rpnp_t   *rpnp;
1361         rpnp =
1362             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1363         if(rpnp == NULL) {
1364           rpnstack_free(&rpnstack);
1365           return -1;
1366         }
1367         /* substitute data values for OP_VARIABLE nodes */
1368         for (i = 0; rpnp[i].op != OP_END; i++) {
1369             if (rpnp[i].op == OP_VARIABLE) {
1370                 rpnp[i].op = OP_NUMBER;
1371                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1372             }
1373         }
1374         /* run the rpn calculator */
1375         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1376             free(rpnp);
1377             rpnstack_free(&rpnstack);
1378             return -1;
1379         }
1380         free(rpnp);
1381     }
1383     /* make pdp_prep ready for the next run */
1384     if (isnan(pdp_new[ds_idx])) {
1385         /* this is not realy accurate if we use subsecond data arival time
1386            should have thought of it when going subsecond resolution ...
1387            sorry next format change we will have it! */
1388         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1389         scratch[PDP_val].u_val = DNAN;
1390     } else {
1391         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1392         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1393     }
1394     rpnstack_free(&rpnstack);
1395     return 0;
1398 /*
1399  * Iterate over all the RRAs for a given DS and:
1400  * 1. Decide whether to schedule a smooth later
1401  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1402  * 3. Update the CDP
1403  *
1404  * Returns 0 on success, -1 on error
1405  */
1406 static int update_all_cdp_prep(
1407     rrd_t *rrd,
1408     unsigned long *rra_step_cnt,
1409     unsigned long rra_begin,
1410     rrd_file_t *rrd_file,
1411     unsigned long elapsed_pdp_st,
1412     unsigned long proc_pdp_cnt,
1413     rrd_value_t **last_seasonal_coef,
1414     rrd_value_t **seasonal_coef,
1415     rrd_value_t *pdp_temp,
1416     unsigned long *skip_update,
1417     int *schedule_smooth)
1419     unsigned long rra_idx;
1421     /* index into the CDP scratch array */
1422     enum cf_en current_cf;
1423     unsigned long rra_start;
1425     /* number of rows to be updated in an RRA for a data value. */
1426     unsigned long start_pdp_offset;
1428     rra_start = rra_begin;
1429     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1430         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1431         start_pdp_offset =
1432             rrd->rra_def[rra_idx].pdp_cnt -
1433             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1434         skip_update[rra_idx] = 0;
1435         if (start_pdp_offset <= elapsed_pdp_st) {
1436             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1437                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1438         } else {
1439             rra_step_cnt[rra_idx] = 0;
1440         }
1442         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1443             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1444              * so that they will be correct for the next observed value; note that for
1445              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1446              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1447             if (rra_step_cnt[rra_idx] > 1) {
1448                 skip_update[rra_idx] = 1;
1449                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1450                                 elapsed_pdp_st, last_seasonal_coef);
1451                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1452                                 elapsed_pdp_st + 1, seasonal_coef);
1453             }
1454             /* periodically run a smoother for seasonal effects */
1455             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1456 #ifdef DEBUG
1457                 fprintf(stderr,
1458                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1459                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1460                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1461                         u_cnt);
1462 #endif
1463                 *schedule_smooth = 1;
1464             }
1465         }
1466         if (rrd_test_error())
1467             return -1;
1469         if (update_cdp_prep
1470             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1471              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1472              current_cf) == -1) {
1473             return -1;
1474         }
1475         rra_start +=
1476             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1477             sizeof(rrd_value_t);
1478     }
1479     return 0;
1482 /* 
1483  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1484  */
1485 static int do_schedule_smooth(
1486     rrd_t *rrd,
1487     unsigned long rra_idx,
1488     unsigned long elapsed_pdp_st)
1490     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1491     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1492     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1493     unsigned long seasonal_smooth_idx =
1494         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1495     unsigned long *init_seasonal =
1496         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1498     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1499      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1500      * really an RRA level, not a data source within RRA level parameter, but
1501      * the rra_def is read only for rrd_update (not flushed to disk). */
1502     if (*init_seasonal > BURNIN_CYCLES) {
1503         /* someone has no doubt invented a trick to deal with this wrap around,
1504          * but at least this code is clear. */
1505         if (seasonal_smooth_idx > cur_row) {
1506             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1507              * between PDP and CDP */
1508             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1509         }
1510         /* can't rely on negative numbers because we are working with
1511          * unsigned values */
1512         return (cur_row + elapsed_pdp_st >= row_cnt
1513                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1514     }
1515     /* mark off one of the burn-in cycles */
1516     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1519 /*
1520  * For a given RRA, iterate over the data sources and call the appropriate
1521  * consolidation function.
1522  *
1523  * Returns 0 on success, -1 on error.
1524  */
1525 static int update_cdp_prep(
1526     rrd_t *rrd,
1527     unsigned long elapsed_pdp_st,
1528     unsigned long start_pdp_offset,
1529     unsigned long *rra_step_cnt,
1530     int rra_idx,
1531     rrd_value_t *pdp_temp,
1532     rrd_value_t *last_seasonal_coef,
1533     rrd_value_t *seasonal_coef,
1534     int current_cf)
1536     unsigned long ds_idx, cdp_idx;
1538     /* update CDP_PREP areas */
1539     /* loop over data soures within each RRA */
1540     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1542         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1544         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1545             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1546                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1547                        elapsed_pdp_st, start_pdp_offset,
1548                        rrd->rra_def[rra_idx].pdp_cnt,
1549                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1550                        rra_idx, ds_idx);
1551         } else {
1552             /* Nothing to consolidate if there's one PDP per CDP. However, if
1553              * we've missed some PDPs, let's update null counters etc. */
1554             if (elapsed_pdp_st > 2) {
1555                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1556                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1557                           (enum cf_en)current_cf);
1558             }
1559         }
1561         if (rrd_test_error())
1562             return -1;
1563     }                   /* endif data sources loop */
1564     return 0;
1567 /*
1568  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1569  * primary value, secondary value, and # of unknowns.
1570  */
1571 static void update_cdp(
1572     unival *scratch,
1573     int current_cf,
1574     rrd_value_t pdp_temp_val,
1575     unsigned long rra_step_cnt,
1576     unsigned long elapsed_pdp_st,
1577     unsigned long start_pdp_offset,
1578     unsigned long pdp_cnt,
1579     rrd_value_t xff,
1580     int i,
1581     int ii)
1583     /* shorthand variables */
1584     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1585     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1586     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1587     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1589     if (rra_step_cnt) {
1590         /* If we are in this block, as least 1 CDP value will be written to
1591          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1592          * to be written, then the "fill in" value is the CDP_secondary_val
1593          * entry. */
1594         if (isnan(pdp_temp_val)) {
1595             *cdp_unkn_pdp_cnt += start_pdp_offset;
1596             *cdp_secondary_val = DNAN;
1597         } else {
1598             /* CDP_secondary value is the RRA "fill in" value for intermediary
1599              * CDP data entries. No matter the CF, the value is the same because
1600              * the average, max, min, and last of a list of identical values is
1601              * the same, namely, the value itself. */
1602             *cdp_secondary_val = pdp_temp_val;
1603         }
1605         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1606             *cdp_primary_val = DNAN;
1607         } else {
1608             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1609                                start_pdp_offset, pdp_cnt);
1610         }
1611         *cdp_val =
1612             initialize_carry_over(pdp_temp_val,current_cf,
1613                                   elapsed_pdp_st,
1614                                   start_pdp_offset, pdp_cnt);
1615                /* endif meets xff value requirement for a valid value */
1616         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1617          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1618         if (isnan(pdp_temp_val))
1619             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1620         else
1621             *cdp_unkn_pdp_cnt = 0;
1622     } else {            /* rra_step_cnt[i]  == 0 */
1624 #ifdef DEBUG
1625         if (isnan(*cdp_val)) {
1626             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1627                     i, ii);
1628         } else {
1629             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1630                     i, ii, *cdp_val);
1631         }
1632 #endif
1633         if (isnan(pdp_temp_val)) {
1634             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1635         } else {
1636             *cdp_val =
1637                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1638                                   current_cf, i, ii);
1639         }
1640     }
1643 /*
1644  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1645  * on the type of consolidation function.
1646  */
1647 static void initialize_cdp_val(
1648     unival *scratch,
1649     int current_cf,
1650     rrd_value_t pdp_temp_val,
1651     unsigned long start_pdp_offset,
1652     unsigned long pdp_cnt)
1654     rrd_value_t cum_val, cur_val;
1656     switch (current_cf) {
1657     case CF_AVERAGE:
1658         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1659         cur_val = IFDNAN(pdp_temp_val, 0.0);
1660         scratch[CDP_primary_val].u_val =
1661             (cum_val + cur_val * start_pdp_offset) /
1662             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1663         break;
1664     case CF_MAXIMUM: 
1665         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1666         cur_val = IFDNAN(pdp_temp_val, -DINF);
1668 #if 0
1669 #ifdef DEBUG
1670         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1671             fprintf(stderr,
1672                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1673                     i, ii);
1674             exit(-1);
1675         }
1676 #endif
1677 #endif
1678         if (cur_val > cum_val)
1679             scratch[CDP_primary_val].u_val = cur_val;
1680         else
1681             scratch[CDP_primary_val].u_val = cum_val;
1682         break;
1683     case CF_MINIMUM:
1684         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1685         cur_val = IFDNAN(pdp_temp_val, DINF);
1686 #if 0
1687 #ifdef DEBUG
1688         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1689             fprintf(stderr,
1690                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1691                     ii);
1692             exit(-1);
1693         }
1694 #endif
1695 #endif
1696         if (cur_val < cum_val)
1697             scratch[CDP_primary_val].u_val = cur_val;
1698         else
1699             scratch[CDP_primary_val].u_val = cum_val;
1700         break;
1701     case CF_LAST:
1702     default:
1703         scratch[CDP_primary_val].u_val = pdp_temp_val;
1704         break;
1705     }
1708 /*
1709  * Update the consolidation function for Holt-Winters functions as
1710  * well as other functions that don't actually consolidate multiple
1711  * PDPs.
1712  */
1713 static void reset_cdp(
1714     rrd_t *rrd,
1715     unsigned long elapsed_pdp_st,
1716     rrd_value_t *pdp_temp,
1717     rrd_value_t *last_seasonal_coef,
1718     rrd_value_t *seasonal_coef,
1719     int rra_idx,
1720     int ds_idx,
1721     int cdp_idx,
1722     enum cf_en current_cf)
1724     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1726     switch (current_cf) {
1727     case CF_AVERAGE:
1728     default:
1729         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1730         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1731         break;
1732     case CF_SEASONAL:
1733     case CF_DEVSEASONAL:
1734         /* need to update cached seasonal values, so they are consistent
1735          * with the bulk update */
1736         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1737          * CDP_last_deviation are the same. */
1738         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1739         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1740         break;
1741     case CF_HWPREDICT:
1742     case CF_MHWPREDICT:
1743         /* need to update the null_count and last_null_count.
1744          * even do this for non-DNAN pdp_temp because the
1745          * algorithm is not learning from batch updates. */
1746         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1747         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1748         /* fall through */
1749     case CF_DEVPREDICT:
1750         scratch[CDP_primary_val].u_val = DNAN;
1751         scratch[CDP_secondary_val].u_val = DNAN;
1752         break;
1753     case CF_FAILURES:
1754         /* do not count missed bulk values as failures */
1755         scratch[CDP_primary_val].u_val = 0;
1756         scratch[CDP_secondary_val].u_val = 0;
1757         /* need to reset violations buffer.
1758          * could do this more carefully, but for now, just
1759          * assume a bulk update wipes away all violations. */
1760         erase_violations(rrd, cdp_idx, rra_idx);
1761         break;
1762     }
1765 static rrd_value_t initialize_carry_over(
1766     rrd_value_t pdp_temp_val,
1767     int current_cf,
1768     unsigned long elapsed_pdp_st,
1769     unsigned long start_pdp_offset,
1770     unsigned long pdp_cnt)
1772     unsigned long pdp_into_cdp_cnt = ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1773     if ( pdp_into_cdp_cnt == 0 || isnan(pdp_temp_val)){
1774         switch (current_cf) {
1775         case CF_MAXIMUM:
1776             return -DINF;
1777         case CF_MINIMUM:
1778             return DINF;
1779         case CF_AVERAGE:
1780             return 0;
1781         default:
1782             return DNAN;
1783         }        
1784     } 
1785     else {
1786         switch (current_cf) {
1787         case CF_AVERAGE:
1788             return pdp_temp_val *  pdp_into_cdp_cnt ;
1789         default:
1790             return pdp_temp_val;
1791         }        
1792     }        
1795 /*
1796  * Update or initialize a CDP value based on the consolidation
1797  * function.
1798  *
1799  * Returns the new value.
1800  */
1801 static rrd_value_t calculate_cdp_val(
1802     rrd_value_t cdp_val,
1803     rrd_value_t pdp_temp_val,
1804     unsigned long elapsed_pdp_st,
1805     int current_cf,
1806 #ifdef DEBUG
1807     int i,
1808     int ii
1809 #else
1810     int UNUSED(i),
1811     int UNUSED(ii)
1812 #endif
1813     )
1815     if (isnan(cdp_val)) {
1816         if (current_cf == CF_AVERAGE) {
1817             pdp_temp_val *= elapsed_pdp_st;
1818         }
1819 #ifdef DEBUG
1820         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1821                 i, ii, pdp_temp_val);
1822 #endif
1823         return pdp_temp_val;
1824     }
1825     if (current_cf == CF_AVERAGE)
1826         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1827     if (current_cf == CF_MINIMUM)
1828         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1829     if (current_cf == CF_MAXIMUM)
1830         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1832     return pdp_temp_val;
1835 /*
1836  * For each RRA, update the seasonal values and then call update_aberrant_CF
1837  * for each data source.
1838  *
1839  * Return 0 on success, -1 on error.
1840  */
1841 static int update_aberrant_cdps(
1842     rrd_t *rrd,
1843     rrd_file_t *rrd_file,
1844     unsigned long rra_begin,
1845     unsigned long elapsed_pdp_st,
1846     rrd_value_t *pdp_temp,
1847     rrd_value_t **seasonal_coef)
1849     unsigned long rra_idx, ds_idx, j;
1851     /* number of PDP steps since the last update that
1852      * are assigned to the first CDP to be generated
1853      * since the last update. */
1854     unsigned short scratch_idx;
1855     unsigned long rra_start;
1856     enum cf_en current_cf;
1858     /* this loop is only entered if elapsed_pdp_st < 3 */
1859     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1860          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1861         rra_start = rra_begin;
1862         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1863             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1864                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1865                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1866                     if (scratch_idx == CDP_primary_val) {
1867                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1868                                         elapsed_pdp_st + 1, seasonal_coef);
1869                     } else {
1870                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1871                                         elapsed_pdp_st + 2, seasonal_coef);
1872                     }
1873                 }
1874                 if (rrd_test_error())
1875                     return -1;
1876                 /* loop over data soures within each RRA */
1877                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1878                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1879                                        rra_idx * (rrd->stat_head->ds_cnt) +
1880                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1881                                        *seasonal_coef);
1882                 }
1883             }
1884             rra_start += rrd->rra_def[rra_idx].row_cnt
1885                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1886         }
1887     }
1888     return 0;
1891 /* 
1892  * Move sequentially through the file, writing one RRA at a time.  Note this
1893  * architecture divorces the computation of CDP with flushing updated RRA
1894  * entries to disk.
1895  *
1896  * Return 0 on success, -1 on error.
1897  */
1898 static int write_to_rras(
1899     rrd_t *rrd,
1900     rrd_file_t *rrd_file,
1901     unsigned long *rra_step_cnt,
1902     unsigned long rra_begin,
1903     time_t current_time,
1904     unsigned long *skip_update,
1905     rrd_info_t ** pcdp_summary)
1907     unsigned long rra_idx;
1908     unsigned long rra_start;
1909     time_t    rra_time = 0; /* time of update for a RRA */
1911     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1912     
1913     /* Ready to write to disk */
1914     rra_start = rra_begin;
1916     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1917         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1918         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1920         /* for cdp_prep */
1921         unsigned short scratch_idx;
1922         unsigned long step_subtract;
1924         for (scratch_idx = CDP_primary_val,
1925                  step_subtract = 1;
1926              rra_step_cnt[rra_idx] > 0;
1927              rra_step_cnt[rra_idx]--,
1928                  scratch_idx = CDP_secondary_val,
1929                  step_subtract = 2) {
1931             size_t rra_pos_new;
1932 #ifdef DEBUG
1933             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1934 #endif
1935             /* increment, with wrap-around */
1936             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1937               rra_ptr->cur_row = 0;
1939             /* we know what our position should be */
1940             rra_pos_new = rra_start
1941               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1943             /* re-seek if the position is wrong or we wrapped around */
1944             if ((size_t)rra_pos_new != rrd_file->pos) {
1945                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1946                     rrd_set_error("seek error in rrd");
1947                     return -1;
1948                 }
1949             }
1950 #ifdef DEBUG
1951             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1952 #endif
1954             if (skip_update[rra_idx])
1955                 continue;
1957             if (*pcdp_summary != NULL) {
1958                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1960                 rra_time = (current_time - current_time % step_time)
1961                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1962             }
1964             if (write_RRA_row
1965                 (rrd_file, rrd, rra_idx, scratch_idx,
1966                  pcdp_summary, rra_time) == -1)
1967                 return -1;
1969             rrd_notify_row(rrd_file, rra_idx, rra_pos_new, rra_time);
1970         }
1972         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1973     } /* RRA LOOP */
1975     return 0;
1978 /*
1979  * Write out one row of values (one value per DS) to the archive.
1980  *
1981  * Returns 0 on success, -1 on error.
1982  */
1983 static int write_RRA_row(
1984     rrd_file_t *rrd_file,
1985     rrd_t *rrd,
1986     unsigned long rra_idx,
1987     unsigned short CDP_scratch_idx,
1988     rrd_info_t ** pcdp_summary,
1989     time_t rra_time)
1991     unsigned long ds_idx, cdp_idx;
1992     rrd_infoval_t iv;
1994     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1995         /* compute the cdp index */
1996         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1997 #ifdef DEBUG
1998         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1999                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2000                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2001 #endif
2002         if (*pcdp_summary != NULL) {
2003             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2004             /* append info to the return hash */
2005             *pcdp_summary = rrd_info_push(*pcdp_summary,
2006                                           sprintf_alloc
2007                                           ("[%lli]RRA[%s][%lu]DS[%s]", 
2008                                            (long long)rra_time,
2009                                            rrd->rra_def[rra_idx].cf_nam,
2010                                            rrd->rra_def[rra_idx].pdp_cnt,
2011                                            rrd->ds_def[ds_idx].ds_nam),
2012                                            RD_I_VAL, iv);
2013         }
2014         errno = 0;
2015         if (rrd_write(rrd_file,
2016                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2017                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2018             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2019             return -1;
2020         }
2021     }
2022     return 0;
2025 /*
2026  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2027  *
2028  * Returns 0 on success, -1 otherwise
2029  */
2030 static int smooth_all_rras(
2031     rrd_t *rrd,
2032     rrd_file_t *rrd_file,
2033     unsigned long rra_begin)
2035     unsigned long rra_start = rra_begin;
2036     unsigned long rra_idx;
2038     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2039         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2040             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2041 #ifdef DEBUG
2042             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2043 #endif
2044             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2045             if (rrd_test_error())
2046                 return -1;
2047         }
2048         rra_start += rrd->rra_def[rra_idx].row_cnt
2049             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2050     }
2051     return 0;
2054 #ifndef HAVE_MMAP
2055 /*
2056  * Flush changes to disk (unless we're using mmap)
2057  *
2058  * Returns 0 on success, -1 otherwise
2059  */
2060 static int write_changes_to_disk(
2061     rrd_t *rrd,
2062     rrd_file_t *rrd_file,
2063     int version)
2065     /* we just need to write back the live header portion now */
2066     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2067                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2068                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2069                  SEEK_SET) != 0) {
2070         rrd_set_error("seek rrd for live header writeback");
2071         return -1;
2072     }
2073     if (version >= 3) {
2074         if (rrd_write(rrd_file, rrd->live_head,
2075                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2076             rrd_set_error("rrd_write live_head to rrd");
2077             return -1;
2078         }
2079     } else {
2080         if (rrd_write(rrd_file, rrd->legacy_last_up,
2081                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2082             rrd_set_error("rrd_write live_head to rrd");
2083             return -1;
2084         }
2085     }
2088     if (rrd_write(rrd_file, rrd->pdp_prep,
2089                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2090         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2091         rrd_set_error("rrd_write pdp_prep to rrd");
2092         return -1;
2093     }
2095     if (rrd_write(rrd_file, rrd->cdp_prep,
2096                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2097                   rrd->stat_head->ds_cnt)
2098         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2099                       rrd->stat_head->ds_cnt)) {
2101         rrd_set_error("rrd_write cdp_prep to rrd");
2102         return -1;
2103     }
2105     if (rrd_write(rrd_file, rrd->rra_ptr,
2106                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2107         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2108         rrd_set_error("rrd_write rra_ptr to rrd");
2109         return -1;
2110     }
2111     return 0;
2113 #endif