Code

2b4a29300844208026abfca9331bc6222c78cb8a
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.3.2  Copyright by Tobi Oetiker, 1997-2008
3  *                Copyright by Florian Forster, 2008
4  *****************************************************************************
5  * rrd_update.c  RRD Update Function
6  *****************************************************************************
7  * $Id$
8  *****************************************************************************/
10 #include "rrd_tool.h"
12 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
13 #include <sys/locking.h>
14 #include <sys/stat.h>
15 #include <io.h>
16 #endif
18 #include <locale.h>
20 #include "rrd_hw.h"
21 #include "rrd_rpncalc.h"
23 #include "rrd_is_thread_safe.h"
24 #include "unused.h"
26 #include "rrd_client.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37     time_t    tv_sec;   /* seconds */
38     long      tv_usec;  /* microseconds */
39 };
40 #endif
42 struct __timezone {
43     int       tz_minuteswest;   /* minutes W of Greenwich */
44     int       tz_dsttime;   /* type of dst correction */
45 };
47 static int gettimeofday(
48     struct timeval *t,
49     struct __timezone *tz)
50 {
52     struct _timeb current_time;
54     _ftime(&current_time);
56     t->tv_sec = current_time.time;
57     t->tv_usec = current_time.millitm * 1000;
59     return 0;
60 }
62 #endif
64 /* FUNCTION PROTOTYPES */
66 int       rrd_update_r(
67     const char *filename,
68     const char *tmplt,
69     int argc,
70     const char **argv);
71 int       _rrd_update(
72     const char *filename,
73     const char *tmplt,
74     int argc,
75     const char **argv,
76     rrd_info_t *);
78 static int allocate_data_structures(
79     rrd_t *rrd,
80     char ***updvals,
81     rrd_value_t **pdp_temp,
82     const char *tmplt,
83     long **tmpl_idx,
84     unsigned long *tmpl_cnt,
85     unsigned long **rra_step_cnt,
86     unsigned long **skip_update,
87     rrd_value_t **pdp_new);
89 static int parse_template(
90     rrd_t *rrd,
91     const char *tmplt,
92     unsigned long *tmpl_cnt,
93     long *tmpl_idx);
95 static int process_arg(
96     char *step_start,
97     rrd_t *rrd,
98     rrd_file_t *rrd_file,
99     unsigned long rra_begin,
100     time_t *current_time,
101     unsigned long *current_time_usec,
102     rrd_value_t *pdp_temp,
103     rrd_value_t *pdp_new,
104     unsigned long *rra_step_cnt,
105     char **updvals,
106     long *tmpl_idx,
107     unsigned long tmpl_cnt,
108     rrd_info_t ** pcdp_summary,
109     int version,
110     unsigned long *skip_update,
111     int *schedule_smooth);
113 static int parse_ds(
114     rrd_t *rrd,
115     char **updvals,
116     long *tmpl_idx,
117     char *input,
118     unsigned long tmpl_cnt,
119     time_t *current_time,
120     unsigned long *current_time_usec,
121     int version);
123 static int get_time_from_reading(
124     rrd_t *rrd,
125     char timesyntax,
126     char **updvals,
127     time_t *current_time,
128     unsigned long *current_time_usec,
129     int version);
131 static int update_pdp_prep(
132     rrd_t *rrd,
133     char **updvals,
134     rrd_value_t *pdp_new,
135     double interval);
137 static int calculate_elapsed_steps(
138     rrd_t *rrd,
139     unsigned long current_time,
140     unsigned long current_time_usec,
141     double interval,
142     double *pre_int,
143     double *post_int,
144     unsigned long *proc_pdp_cnt);
146 static void simple_update(
147     rrd_t *rrd,
148     double interval,
149     rrd_value_t *pdp_new);
151 static int process_all_pdp_st(
152     rrd_t *rrd,
153     double interval,
154     double pre_int,
155     double post_int,
156     unsigned long elapsed_pdp_st,
157     rrd_value_t *pdp_new,
158     rrd_value_t *pdp_temp);
160 static int process_pdp_st(
161     rrd_t *rrd,
162     unsigned long ds_idx,
163     double interval,
164     double pre_int,
165     double post_int,
166     long diff_pdp_st,
167     rrd_value_t *pdp_new,
168     rrd_value_t *pdp_temp);
170 static int update_all_cdp_prep(
171     rrd_t *rrd,
172     unsigned long *rra_step_cnt,
173     unsigned long rra_begin,
174     rrd_file_t *rrd_file,
175     unsigned long elapsed_pdp_st,
176     unsigned long proc_pdp_cnt,
177     rrd_value_t **last_seasonal_coef,
178     rrd_value_t **seasonal_coef,
179     rrd_value_t *pdp_temp,
180     unsigned long *skip_update,
181     int *schedule_smooth);
183 static int do_schedule_smooth(
184     rrd_t *rrd,
185     unsigned long rra_idx,
186     unsigned long elapsed_pdp_st);
188 static int update_cdp_prep(
189     rrd_t *rrd,
190     unsigned long elapsed_pdp_st,
191     unsigned long start_pdp_offset,
192     unsigned long *rra_step_cnt,
193     int rra_idx,
194     rrd_value_t *pdp_temp,
195     rrd_value_t *last_seasonal_coef,
196     rrd_value_t *seasonal_coef,
197     int current_cf);
199 static void update_cdp(
200     unival *scratch,
201     int current_cf,
202     rrd_value_t pdp_temp_val,
203     unsigned long rra_step_cnt,
204     unsigned long elapsed_pdp_st,
205     unsigned long start_pdp_offset,
206     unsigned long pdp_cnt,
207     rrd_value_t xff,
208     int i,
209     int ii);
211 static void initialize_cdp_val(
212     unival *scratch,
213     int current_cf,
214     rrd_value_t pdp_temp_val,
215     unsigned long elapsed_pdp_st,
216     unsigned long start_pdp_offset,
217     unsigned long pdp_cnt);
219 static void reset_cdp(
220     rrd_t *rrd,
221     unsigned long elapsed_pdp_st,
222     rrd_value_t *pdp_temp,
223     rrd_value_t *last_seasonal_coef,
224     rrd_value_t *seasonal_coef,
225     int rra_idx,
226     int ds_idx,
227     int cdp_idx,
228     enum cf_en current_cf);
230 static rrd_value_t initialize_average_carry_over(
231     rrd_value_t pdp_temp_val,
232     unsigned long elapsed_pdp_st,
233     unsigned long start_pdp_offset,
234     unsigned long pdp_cnt);
236 static rrd_value_t calculate_cdp_val(
237     rrd_value_t cdp_val,
238     rrd_value_t pdp_temp_val,
239     unsigned long elapsed_pdp_st,
240     int current_cf,
241     int i,
242     int ii);
244 static int update_aberrant_cdps(
245     rrd_t *rrd,
246     rrd_file_t *rrd_file,
247     unsigned long rra_begin,
248     unsigned long elapsed_pdp_st,
249     rrd_value_t *pdp_temp,
250     rrd_value_t **seasonal_coef);
252 static int write_to_rras(
253     rrd_t *rrd,
254     rrd_file_t *rrd_file,
255     unsigned long *rra_step_cnt,
256     unsigned long rra_begin,
257     time_t current_time,
258     unsigned long *skip_update,
259     rrd_info_t ** pcdp_summary);
261 static int write_RRA_row(
262     rrd_file_t *rrd_file,
263     rrd_t *rrd,
264     unsigned long rra_idx,
265     unsigned short CDP_scratch_idx,
266     rrd_info_t ** pcdp_summary,
267     time_t rra_time);
269 static int smooth_all_rras(
270     rrd_t *rrd,
271     rrd_file_t *rrd_file,
272     unsigned long rra_begin);
274 #ifndef HAVE_MMAP
275 static int write_changes_to_disk(
276     rrd_t *rrd,
277     rrd_file_t *rrd_file,
278     int version);
279 #endif
281 /*
282  * normalize time as returned by gettimeofday. usec part must
283  * be always >= 0
284  */
285 static inline void normalize_time(
286     struct timeval *t)
288     if (t->tv_usec < 0) {
289         t->tv_sec--;
290         t->tv_usec += 1e6L;
291     }
294 /*
295  * Sets current_time and current_time_usec based on the current time.
296  * current_time_usec is set to 0 if the version number is 1 or 2.
297  */
298 static inline void initialize_time(
299     time_t *current_time,
300     unsigned long *current_time_usec,
301     int version)
303     struct timeval tmp_time;    /* used for time conversion */
305     gettimeofday(&tmp_time, 0);
306     normalize_time(&tmp_time);
307     *current_time = tmp_time.tv_sec;
308     if (version >= 3) {
309         *current_time_usec = tmp_time.tv_usec;
310     } else {
311         *current_time_usec = 0;
312     }
315 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
317 rrd_info_t *rrd_update_v(
318     int argc,
319     char **argv)
321     char     *tmplt = NULL;
322     rrd_info_t *result = NULL;
323     rrd_infoval_t rc;
324     struct option long_options[] = {
325         {"template", required_argument, 0, 't'},
326         {0, 0, 0, 0}
327     };
329     rc.u_int = -1;
330     optind = 0;
331     opterr = 0;         /* initialize getopt */
333     while (1) {
334         int       option_index = 0;
335         int       opt;
337         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
339         if (opt == EOF)
340             break;
342         switch (opt) {
343         case 't':
344             tmplt = optarg;
345             break;
347         case '?':
348             rrd_set_error("unknown option '%s'", argv[optind - 1]);
349             goto end_tag;
350         }
351     }
353     /* need at least 2 arguments: filename, data. */
354     if (argc - optind < 2) {
355         rrd_set_error("Not enough arguments");
356         goto end_tag;
357     }
358     rc.u_int = 0;
359     result = rrd_info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
360     rc.u_int = _rrd_update(argv[optind], tmplt,
361                            argc - optind - 1,
362                            (const char **) (argv + optind + 1), result);
363     result->value.u_int = rc.u_int;
364   end_tag:
365     return result;
368 int rrd_update(
369     int argc,
370     char **argv)
372     struct option long_options[] = {
373         {"template", required_argument, 0, 't'},
374         {"daemon",   required_argument, 0, 'd'},
375         {0, 0, 0, 0}
376     };
377     int       option_index = 0;
378     int       opt;
379     char     *tmplt = NULL;
380     int       rc = -1;
381     char     *opt_daemon = NULL;
383     optind = 0;
384     opterr = 0;         /* initialize getopt */
386     while (1) {
387         opt = getopt_long(argc, argv, "t:d:", long_options, &option_index);
389         if (opt == EOF)
390             break;
392         switch (opt) {
393         case 't':
394             tmplt = strdup(optarg);
395             break;
397         case 'd':
398             if (opt_daemon != NULL)
399                 free (opt_daemon);
400             opt_daemon = strdup (optarg);
401             if (opt_daemon == NULL)
402             {
403                 rrd_set_error("strdup failed.");
404                 goto out;
405             }
406             break;
408         case '?':
409             rrd_set_error("unknown option '%s'", argv[optind - 1]);
410             goto out;
411         }
412     }
414     /* need at least 2 arguments: filename, data. */
415     if (argc - optind < 2) {
416         rrd_set_error("Not enough arguments");
417         goto out;
418     }
420     if ((tmplt != NULL) && (opt_daemon != NULL))
421     {
422         rrd_set_error("The caching opt_daemon cannot be used together with "
423                 "templates yet.");
424         goto out;
425     }
427     if ((tmplt == NULL) && (opt_daemon == NULL))
428     {
429         char *temp;
431         temp = getenv (ENV_RRDCACHED_ADDRESS);
432         if (temp != NULL)
433         {
434             opt_daemon = strdup (temp);
435             if (opt_daemon == NULL)
436             {
437                 rrd_set_error("strdup failed.");
438                 goto out;
439             }
440         }
441     }
443     if (opt_daemon != NULL)
444     {
445         int status;
447         status = rrdc_connect (opt_daemon);
448         if (status != 0)
449         {
450             rrd_set_error("Unable to connect to opt_daemon: %s",
451                     (status < 0)
452                     ? "Internal error"
453                     : rrd_strerror (status));
454             goto out;
455         }
457         status = rrdc_update (/* file = */ argv[optind],
458                 /* values_num = */ argc - optind - 1,
459                 /* values = */ (void *) (argv + optind + 1));
460         if (status != 0)
461         {
462             rrd_set_error("Failed sending the values to the opt_daemon: %s",
463                     (status < 0)
464                     ? "Internal error"
465                     : rrd_strerror (status));
466         }
467         else
468         {
469             rc = 0;
470         }
472         rrdc_disconnect ();
473         goto out;
474     } /* if (opt_daemon != NULL) */
476     rc = rrd_update_r(argv[optind], tmplt,
477                       argc - optind - 1, (const char **) (argv + optind + 1));
478   out:
479     if (tmplt != NULL)
480     {
481         free(tmplt);
482         tmplt = NULL;
483     }
484     if (opt_daemon != NULL)
485     {
486         free (opt_daemon);
487         opt_daemon = NULL;
488     }
489     return rc;
492 int rrd_update_r(
493     const char *filename,
494     const char *tmplt,
495     int argc,
496     const char **argv)
498     return _rrd_update(filename, tmplt, argc, argv, NULL);
501 int _rrd_update(
502     const char *filename,
503     const char *tmplt,
504     int argc,
505     const char **argv,
506     rrd_info_t * pcdp_summary)
509     int       arg_i = 2;
511     unsigned long rra_begin;    /* byte pointer to the rra
512                                  * area in the rrd file.  this
513                                  * pointer never changes value */
514     rrd_value_t *pdp_new;   /* prepare the incoming data to be added 
515                              * to the existing entry */
516     rrd_value_t *pdp_temp;  /* prepare the pdp values to be added 
517                              * to the cdp values */
519     long     *tmpl_idx; /* index representing the settings
520                          * transported by the tmplt index */
521     unsigned long tmpl_cnt = 2; /* time and data */
522     rrd_t     rrd;
523     time_t    current_time = 0;
524     unsigned long current_time_usec = 0;    /* microseconds part of current time */
525     char    **updvals;
526     int       schedule_smooth = 0;
528     /* number of elapsed PDP steps since last update */
529     unsigned long *rra_step_cnt = NULL;
531     int       version;  /* rrd version */
532     rrd_file_t *rrd_file;
533     char     *arg_copy; /* for processing the argv */
534     unsigned long *skip_update; /* RRAs to advance but not write */
536     /* need at least 1 arguments: data. */
537     if (argc < 1) {
538         rrd_set_error("Not enough arguments");
539         goto err_out;
540     }
542     if ((rrd_file = rrd_open(filename, &rrd, RRD_READWRITE)) == NULL) {
543         goto err_free;
544     }
545     /* We are now at the beginning of the rra's */
546     rra_begin = rrd_file->header_len;
548     version = atoi(rrd.stat_head->version);
550     initialize_time(&current_time, &current_time_usec, version);
552     /* get exclusive lock to whole file.
553      * lock gets removed when we close the file.
554      */
555     if (rrd_lock(rrd_file) != 0) {
556         rrd_set_error("could not lock RRD");
557         goto err_close;
558     }
560     if (allocate_data_structures(&rrd, &updvals,
561                                  &pdp_temp, tmplt, &tmpl_idx, &tmpl_cnt,
562                                  &rra_step_cnt, &skip_update,
563                                  &pdp_new) == -1) {
564         goto err_close;
565     }
567     /* loop through the arguments. */
568     for (arg_i = 0; arg_i < argc; arg_i++) {
569         if ((arg_copy = strdup(argv[arg_i])) == NULL) {
570             rrd_set_error("failed duplication argv entry");
571             break;
572         }
573         if (process_arg(arg_copy, &rrd, rrd_file, rra_begin,
574                         &current_time, &current_time_usec, pdp_temp, pdp_new,
575                         rra_step_cnt, updvals, tmpl_idx, tmpl_cnt,
576                         &pcdp_summary, version, skip_update,
577                         &schedule_smooth) == -1) {
578             if (rrd_test_error()) { /* Should have error string always here */
579                 char     *save_error;
581                 /* Prepend file name to error message */
582                 if ((save_error = strdup(rrd_get_error())) != NULL) {
583                     rrd_set_error("%s: %s", filename, save_error);
584                     free(save_error);
585                 }
586             }
587             free(arg_copy);
588             break;
589         }
590         free(arg_copy);
591     }
593     free(rra_step_cnt);
595     /* if we got here and if there is an error and if the file has not been
596      * written to, then close things up and return. */
597     if (rrd_test_error()) {
598         goto err_free_structures;
599     }
600 #ifndef HAVE_MMAP
601     if (write_changes_to_disk(&rrd, rrd_file, version) == -1) {
602         goto err_free_structures;
603     }
604 #endif
606     /* calling the smoothing code here guarantees at most one smoothing
607      * operation per rrd_update call. Unfortunately, it is possible with bulk
608      * updates, or a long-delayed update for smoothing to occur off-schedule.
609      * This really isn't critical except during the burn-in cycles. */
610     if (schedule_smooth) {
611         smooth_all_rras(&rrd, rrd_file, rra_begin);
612     }
614 /*    rrd_dontneed(rrd_file,&rrd); */
615     rrd_free(&rrd);
616     rrd_close(rrd_file);
618     free(pdp_new);
619     free(tmpl_idx);
620     free(pdp_temp);
621     free(skip_update);
622     free(updvals);
623     return 0;
625   err_free_structures:
626     free(pdp_new);
627     free(tmpl_idx);
628     free(pdp_temp);
629     free(skip_update);
630     free(updvals);
631   err_close:
632     rrd_close(rrd_file);
633   err_free:
634     rrd_free(&rrd);
635   err_out:
636     return -1;
639 /*
640  * get exclusive lock to whole file.
641  * lock gets removed when we close the file
642  *
643  * returns 0 on success
644  */
645 int rrd_lock(
646     rrd_file_t *file)
648     int       rcstat;
650     {
651 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
652         struct _stat st;
654         if (_fstat(file->fd, &st) == 0) {
655             rcstat = _locking(file->fd, _LK_NBLCK, st.st_size);
656         } else {
657             rcstat = -1;
658         }
659 #else
660         struct flock lock;
662         lock.l_type = F_WRLCK;  /* exclusive write lock */
663         lock.l_len = 0; /* whole file */
664         lock.l_start = 0;   /* start of file */
665         lock.l_whence = SEEK_SET;   /* end of file */
667         rcstat = fcntl(file->fd, F_SETLK, &lock);
668 #endif
669     }
671     return (rcstat);
674 /*
675  * Allocate some important arrays used, and initialize the template.
676  *
677  * When it returns, either all of the structures are allocated
678  * or none of them are.
679  *
680  * Returns 0 on success, -1 on error.
681  */
682 static int allocate_data_structures(
683     rrd_t *rrd,
684     char ***updvals,
685     rrd_value_t **pdp_temp,
686     const char *tmplt,
687     long **tmpl_idx,
688     unsigned long *tmpl_cnt,
689     unsigned long **rra_step_cnt,
690     unsigned long **skip_update,
691     rrd_value_t **pdp_new)
693     unsigned  i, ii;
694     if ((*updvals = (char **) malloc(sizeof(char *)
695                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
696         rrd_set_error("allocating updvals pointer array.");
697         return -1;
698     }
699     if ((*pdp_temp = (rrd_value_t *) malloc(sizeof(rrd_value_t)
700                                             * rrd->stat_head->ds_cnt)) ==
701         NULL) {
702         rrd_set_error("allocating pdp_temp.");
703         goto err_free_updvals;
704     }
705     if ((*skip_update = (unsigned long *) malloc(sizeof(unsigned long)
706                                                  *
707                                                  rrd->stat_head->rra_cnt)) ==
708         NULL) {
709         rrd_set_error("allocating skip_update.");
710         goto err_free_pdp_temp;
711     }
712     if ((*tmpl_idx = (long *) malloc(sizeof(unsigned long)
713                                      * (rrd->stat_head->ds_cnt + 1))) == NULL) {
714         rrd_set_error("allocating tmpl_idx.");
715         goto err_free_skip_update;
716     }
717     if ((*rra_step_cnt = (unsigned long *) malloc(sizeof(unsigned long)
718                                                   *
719                                                   (rrd->stat_head->
720                                                    rra_cnt))) == NULL) {
721         rrd_set_error("allocating rra_step_cnt.");
722         goto err_free_tmpl_idx;
723     }
725     /* initialize tmplt redirector */
726     /* default config example (assume DS 1 is a CDEF DS)
727        tmpl_idx[0] -> 0; (time)
728        tmpl_idx[1] -> 1; (DS 0)
729        tmpl_idx[2] -> 3; (DS 2)
730        tmpl_idx[3] -> 4; (DS 3) */
731     (*tmpl_idx)[0] = 0; /* time */
732     for (i = 1, ii = 1; i <= rrd->stat_head->ds_cnt; i++) {
733         if (dst_conv(rrd->ds_def[i - 1].dst) != DST_CDEF)
734             (*tmpl_idx)[ii++] = i;
735     }
736     *tmpl_cnt = ii;
738     if (tmplt != NULL) {
739         if (parse_template(rrd, tmplt, tmpl_cnt, *tmpl_idx) == -1) {
740             goto err_free_rra_step_cnt;
741         }
742     }
744     if ((*pdp_new = (rrd_value_t *) malloc(sizeof(rrd_value_t)
745                                            * rrd->stat_head->ds_cnt)) == NULL) {
746         rrd_set_error("allocating pdp_new.");
747         goto err_free_rra_step_cnt;
748     }
750     return 0;
752   err_free_rra_step_cnt:
753     free(*rra_step_cnt);
754   err_free_tmpl_idx:
755     free(*tmpl_idx);
756   err_free_skip_update:
757     free(*skip_update);
758   err_free_pdp_temp:
759     free(*pdp_temp);
760   err_free_updvals:
761     free(*updvals);
762     return -1;
765 /*
766  * Parses tmplt and puts an ordered list of DS's into tmpl_idx.
767  *
768  * Returns 0 on success.
769  */
770 static int parse_template(
771     rrd_t *rrd,
772     const char *tmplt,
773     unsigned long *tmpl_cnt,
774     long *tmpl_idx)
776     char     *dsname, *tmplt_copy;
777     unsigned int tmpl_len, i;
778     int       ret = 0;
780     *tmpl_cnt = 1;      /* the first entry is the time */
782     /* we should work on a writeable copy here */
783     if ((tmplt_copy = strdup(tmplt)) == NULL) {
784         rrd_set_error("error copying tmplt '%s'", tmplt);
785         ret = -1;
786         goto out;
787     }
789     dsname = tmplt_copy;
790     tmpl_len = strlen(tmplt_copy);
791     for (i = 0; i <= tmpl_len; i++) {
792         if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
793             tmplt_copy[i] = '\0';
794             if (*tmpl_cnt > rrd->stat_head->ds_cnt) {
795                 rrd_set_error("tmplt contains more DS definitions than RRD");
796                 ret = -1;
797                 goto out_free_tmpl_copy;
798             }
799             if ((tmpl_idx[(*tmpl_cnt)++] = ds_match(rrd, dsname) + 1) == 0) {
800                 rrd_set_error("unknown DS name '%s'", dsname);
801                 ret = -1;
802                 goto out_free_tmpl_copy;
803             }
804             /* go to the next entry on the tmplt_copy */
805             if (i < tmpl_len)
806                 dsname = &tmplt_copy[i + 1];
807         }
808     }
809   out_free_tmpl_copy:
810     free(tmplt_copy);
811   out:
812     return ret;
815 /*
816  * Parse an update string, updates the primary data points (PDPs)
817  * and consolidated data points (CDPs), and writes changes to the RRAs.
818  *
819  * Returns 0 on success, -1 on error.
820  */
821 static int process_arg(
822     char *step_start,
823     rrd_t *rrd,
824     rrd_file_t *rrd_file,
825     unsigned long rra_begin,
826     time_t *current_time,
827     unsigned long *current_time_usec,
828     rrd_value_t *pdp_temp,
829     rrd_value_t *pdp_new,
830     unsigned long *rra_step_cnt,
831     char **updvals,
832     long *tmpl_idx,
833     unsigned long tmpl_cnt,
834     rrd_info_t ** pcdp_summary,
835     int version,
836     unsigned long *skip_update,
837     int *schedule_smooth)
839     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
841     /* a vector of future Holt-Winters seasonal coefs */
842     unsigned long elapsed_pdp_st;
844     double    interval, pre_int, post_int;  /* interval between this and
845                                              * the last run */
846     unsigned long proc_pdp_cnt;
848     if (parse_ds(rrd, updvals, tmpl_idx, step_start, tmpl_cnt,
849                  current_time, current_time_usec, version) == -1) {
850         return -1;
851     }
853     interval = (double) (*current_time - rrd->live_head->last_up)
854         + (double) ((long) *current_time_usec -
855                     (long) rrd->live_head->last_up_usec) / 1e6f;
857     /* process the data sources and update the pdp_prep 
858      * area accordingly */
859     if (update_pdp_prep(rrd, updvals, pdp_new, interval) == -1) {
860         return -1;
861     }
863     elapsed_pdp_st = calculate_elapsed_steps(rrd,
864                                              *current_time,
865                                              *current_time_usec, interval,
866                                              &pre_int, &post_int,
867                                              &proc_pdp_cnt);
869     /* has a pdp_st moment occurred since the last run ? */
870     if (elapsed_pdp_st == 0) {
871         /* no we have not passed a pdp_st moment. therefore update is simple */
872         simple_update(rrd, interval, pdp_new);
873     } else {
874         /* an pdp_st has occurred. */
875         if (process_all_pdp_st(rrd, interval,
876                                pre_int, post_int,
877                                elapsed_pdp_st, pdp_new, pdp_temp) == -1) {
878             return -1;
879         }
880         if (update_all_cdp_prep(rrd, rra_step_cnt,
881                                 rra_begin, rrd_file,
882                                 elapsed_pdp_st,
883                                 proc_pdp_cnt,
884                                 &last_seasonal_coef,
885                                 &seasonal_coef,
886                                 pdp_temp,
887                                 skip_update, schedule_smooth) == -1) {
888             goto err_free_coefficients;
889         }
890         if (update_aberrant_cdps(rrd, rrd_file, rra_begin,
891                                  elapsed_pdp_st, pdp_temp,
892                                  &seasonal_coef) == -1) {
893             goto err_free_coefficients;
894         }
895         if (write_to_rras(rrd, rrd_file, rra_step_cnt, rra_begin,
896                           *current_time, skip_update,
897                           pcdp_summary) == -1) {
898             goto err_free_coefficients;
899         }
900     }                   /* endif a pdp_st has occurred */
901     rrd->live_head->last_up = *current_time;
902     rrd->live_head->last_up_usec = *current_time_usec;
904     if (version < 3) {
905         *rrd->legacy_last_up = rrd->live_head->last_up;
906     }
907     free(seasonal_coef);
908     free(last_seasonal_coef);
909     return 0;
911   err_free_coefficients:
912     free(seasonal_coef);
913     free(last_seasonal_coef);
914     return -1;
917 /*
918  * Parse a DS string (time + colon-separated values), storing the
919  * results in current_time, current_time_usec, and updvals.
920  *
921  * Returns 0 on success, -1 on error.
922  */
923 static int parse_ds(
924     rrd_t *rrd,
925     char **updvals,
926     long *tmpl_idx,
927     char *input,
928     unsigned long tmpl_cnt,
929     time_t *current_time,
930     unsigned long *current_time_usec,
931     int version)
933     char     *p;
934     unsigned long i;
935     char      timesyntax;
937     updvals[0] = input;
938     /* initialize all ds input to unknown except the first one
939        which has always got to be set */
940     for (i = 1; i <= rrd->stat_head->ds_cnt; i++)
941         updvals[i] = "U";
943     /* separate all ds elements; first must be examined separately
944        due to alternate time syntax */
945     if ((p = strchr(input, '@')) != NULL) {
946         timesyntax = '@';
947     } else if ((p = strchr(input, ':')) != NULL) {
948         timesyntax = ':';
949     } else {
950         rrd_set_error("expected timestamp not found in data source from %s",
951                       input);
952         return -1;
953     }
954     *p = '\0';
955     i = 1;
956     updvals[tmpl_idx[i++]] = p + 1;
957     while (*(++p)) {
958         if (*p == ':') {
959             *p = '\0';
960             if (i < tmpl_cnt) {
961                 updvals[tmpl_idx[i++]] = p + 1;
962             }
963         }
964     }
966     if (i != tmpl_cnt) {
967         rrd_set_error("expected %lu data source readings (got %lu) from %s",
968                       tmpl_cnt - 1, i, input);
969         return -1;
970     }
972     if (get_time_from_reading(rrd, timesyntax, updvals,
973                               current_time, current_time_usec,
974                               version) == -1) {
975         return -1;
976     }
977     return 0;
980 /*
981  * Parse the time in a DS string, store it in current_time and 
982  * current_time_usec and verify that it's later than the last
983  * update for this DS.
984  *
985  * Returns 0 on success, -1 on error.
986  */
987 static int get_time_from_reading(
988     rrd_t *rrd,
989     char timesyntax,
990     char **updvals,
991     time_t *current_time,
992     unsigned long *current_time_usec,
993     int version)
995     double    tmp;
996     char     *parsetime_error = NULL;
997     char     *old_locale;
998     rrd_time_value_t ds_tv;
999     struct timeval tmp_time;    /* used for time conversion */
1001     /* get the time from the reading ... handle N */
1002     if (timesyntax == '@') {    /* at-style */
1003         if ((parsetime_error = rrd_parsetime(updvals[0], &ds_tv))) {
1004             rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
1005             return -1;
1006         }
1007         if (ds_tv.type == RELATIVE_TO_END_TIME ||
1008             ds_tv.type == RELATIVE_TO_START_TIME) {
1009             rrd_set_error("specifying time relative to the 'start' "
1010                           "or 'end' makes no sense here: %s", updvals[0]);
1011             return -1;
1012         }
1013         *current_time = mktime(&ds_tv.tm) +ds_tv.offset;
1014         *current_time_usec = 0; /* FIXME: how to handle usecs here ? */
1015     } else if (strcmp(updvals[0], "N") == 0) {
1016         gettimeofday(&tmp_time, 0);
1017         normalize_time(&tmp_time);
1018         *current_time = tmp_time.tv_sec;
1019         *current_time_usec = tmp_time.tv_usec;
1020     } else {
1021         old_locale = setlocale(LC_NUMERIC, "C");
1022         tmp = strtod(updvals[0], 0);
1023         setlocale(LC_NUMERIC, old_locale);
1024         *current_time = floor(tmp);
1025         *current_time_usec = (long) ((tmp - (double) *current_time) * 1e6f);
1026     }
1027     /* dont do any correction for old version RRDs */
1028     if (version < 3)
1029         *current_time_usec = 0;
1031     if (*current_time < rrd->live_head->last_up ||
1032         (*current_time == rrd->live_head->last_up &&
1033          (long) *current_time_usec <= (long) rrd->live_head->last_up_usec)) {
1034         rrd_set_error("illegal attempt to update using time %ld when "
1035                       "last update time is %ld (minimum one second step)",
1036                       *current_time, rrd->live_head->last_up);
1037         return -1;
1038     }
1039     return 0;
1042 /*
1043  * Update pdp_new by interpreting the updvals according to the DS type
1044  * (COUNTER, GAUGE, etc.).
1045  *
1046  * Returns 0 on success, -1 on error.
1047  */
1048 static int update_pdp_prep(
1049     rrd_t *rrd,
1050     char **updvals,
1051     rrd_value_t *pdp_new,
1052     double interval)
1054     unsigned long ds_idx;
1055     int       ii;
1056     char     *endptr;   /* used in the conversion */
1057     double    rate;
1058     char     *old_locale;
1059     enum dst_en dst_idx;
1061     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1062         dst_idx = dst_conv(rrd->ds_def[ds_idx].dst);
1064         /* make sure we do not build diffs with old last_ds values */
1065         if (rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt < interval) {
1066             strncpy(rrd->pdp_prep[ds_idx].last_ds, "U", LAST_DS_LEN - 1);
1067             rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1068         }
1070         /* NOTE: DST_CDEF should never enter this if block, because
1071          * updvals[ds_idx+1][0] is initialized to 'U'; unless the caller
1072          * accidently specified a value for the DST_CDEF. To handle this case,
1073          * an extra check is required. */
1075         if ((updvals[ds_idx + 1][0] != 'U') &&
1076             (dst_idx != DST_CDEF) &&
1077             rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt >= interval) {
1078             rate = DNAN;
1080             /* pdp_new contains rate * time ... eg the bytes transferred during
1081              * the interval. Doing it this way saves a lot of math operations
1082              */
1083             switch (dst_idx) {
1084             case DST_COUNTER:
1085             case DST_DERIVE:
1086                 for (ii = 0; updvals[ds_idx + 1][ii] != '\0'; ii++) {
1087                     if ((updvals[ds_idx + 1][ii] < '0'
1088                          || updvals[ds_idx + 1][ii] > '9')
1089                         && (ii != 0 && updvals[ds_idx + 1][ii] != '-')) {
1090                         rrd_set_error("not a simple integer: '%s'",
1091                                       updvals[ds_idx + 1]);
1092                         return -1;
1093                     }
1094                 }
1095                 if (rrd->pdp_prep[ds_idx].last_ds[0] != 'U') {
1096                     pdp_new[ds_idx] =
1097                         rrd_diff(updvals[ds_idx + 1],
1098                                  rrd->pdp_prep[ds_idx].last_ds);
1099                     if (dst_idx == DST_COUNTER) {
1100                         /* simple overflow catcher. This will fail
1101                          * terribly for non 32 or 64 bit counters
1102                          * ... are there any others in SNMP land?
1103                          */
1104                         if (pdp_new[ds_idx] < (double) 0.0)
1105                             pdp_new[ds_idx] += (double) 4294967296.0;   /* 2^32 */
1106                         if (pdp_new[ds_idx] < (double) 0.0)
1107                             pdp_new[ds_idx] += (double) 18446744069414584320.0; /* 2^64-2^32 */
1108                     }
1109                     rate = pdp_new[ds_idx] / interval;
1110                 } else {
1111                     pdp_new[ds_idx] = DNAN;
1112                 }
1113                 break;
1114             case DST_ABSOLUTE:
1115                 old_locale = setlocale(LC_NUMERIC, "C");
1116                 errno = 0;
1117                 pdp_new[ds_idx] = strtod(updvals[ds_idx + 1], &endptr);
1118                 setlocale(LC_NUMERIC, old_locale);
1119                 if (errno > 0) {
1120                     rrd_set_error("converting '%s' to float: %s",
1121                                   updvals[ds_idx + 1], rrd_strerror(errno));
1122                     return -1;
1123                 };
1124                 if (endptr[0] != '\0') {
1125                     rrd_set_error
1126                         ("conversion of '%s' to float not complete: tail '%s'",
1127                          updvals[ds_idx + 1], endptr);
1128                     return -1;
1129                 }
1130                 rate = pdp_new[ds_idx] / interval;
1131                 break;
1132             case DST_GAUGE:
1133                 errno = 0;
1134                 old_locale = setlocale(LC_NUMERIC, "C");
1135                 pdp_new[ds_idx] =
1136                     strtod(updvals[ds_idx + 1], &endptr) * interval;
1137                 setlocale(LC_NUMERIC, old_locale);
1138                 if (errno) {
1139                     rrd_set_error("converting '%s' to float: %s",
1140                                   updvals[ds_idx + 1], rrd_strerror(errno));
1141                     return -1;
1142                 };
1143                 if (endptr[0] != '\0') {
1144                     rrd_set_error
1145                         ("conversion of '%s' to float not complete: tail '%s'",
1146                          updvals[ds_idx + 1], endptr);
1147                     return -1;
1148                 }
1149                 rate = pdp_new[ds_idx] / interval;
1150                 break;
1151             default:
1152                 rrd_set_error("rrd contains unknown DS type : '%s'",
1153                               rrd->ds_def[ds_idx].dst);
1154                 return -1;
1155             }
1156             /* break out of this for loop if the error string is set */
1157             if (rrd_test_error()) {
1158                 return -1;
1159             }
1160             /* make sure pdp_temp is neither too large or too small
1161              * if any of these occur it becomes unknown ...
1162              * sorry folks ... */
1163             if (!isnan(rate) &&
1164                 ((!isnan(rrd->ds_def[ds_idx].par[DS_max_val].u_val) &&
1165                   rate > rrd->ds_def[ds_idx].par[DS_max_val].u_val) ||
1166                  (!isnan(rrd->ds_def[ds_idx].par[DS_min_val].u_val) &&
1167                   rate < rrd->ds_def[ds_idx].par[DS_min_val].u_val))) {
1168                 pdp_new[ds_idx] = DNAN;
1169             }
1170         } else {
1171             /* no news is news all the same */
1172             pdp_new[ds_idx] = DNAN;
1173         }
1176         /* make a copy of the command line argument for the next run */
1177 #ifdef DEBUG
1178         fprintf(stderr, "prep ds[%lu]\t"
1179                 "last_arg '%s'\t"
1180                 "this_arg '%s'\t"
1181                 "pdp_new %10.2f\n",
1182                 ds_idx, rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1183                 pdp_new[ds_idx]);
1184 #endif
1185         strncpy(rrd->pdp_prep[ds_idx].last_ds, updvals[ds_idx + 1],
1186                 LAST_DS_LEN - 1);
1187         rrd->pdp_prep[ds_idx].last_ds[LAST_DS_LEN - 1] = '\0';
1188     }
1189     return 0;
1192 /*
1193  * How many PDP steps have elapsed since the last update? Returns the answer,
1194  * and stores the time between the last update and the last PDP in pre_time,
1195  * and the time between the last PDP and the current time in post_int.
1196  */
1197 static int calculate_elapsed_steps(
1198     rrd_t *rrd,
1199     unsigned long current_time,
1200     unsigned long current_time_usec,
1201     double interval,
1202     double *pre_int,
1203     double *post_int,
1204     unsigned long *proc_pdp_cnt)
1206     unsigned long proc_pdp_st;  /* which pdp_st was the last to be processed */
1207     unsigned long occu_pdp_st;  /* when was the pdp_st before the last update
1208                                  * time */
1209     unsigned long proc_pdp_age; /* how old was the data in the pdp prep area 
1210                                  * when it was last updated */
1211     unsigned long occu_pdp_age; /* how long ago was the last pdp_step time */
1213     /* when was the current pdp started */
1214     proc_pdp_age = rrd->live_head->last_up % rrd->stat_head->pdp_step;
1215     proc_pdp_st = rrd->live_head->last_up - proc_pdp_age;
1217     /* when did the last pdp_st occur */
1218     occu_pdp_age = current_time % rrd->stat_head->pdp_step;
1219     occu_pdp_st = current_time - occu_pdp_age;
1221     if (occu_pdp_st > proc_pdp_st) {
1222         /* OK we passed the pdp_st moment */
1223         *pre_int = (long) occu_pdp_st - rrd->live_head->last_up;    /* how much of the input data
1224                                                                      * occurred before the latest
1225                                                                      * pdp_st moment*/
1226         *pre_int -= ((double) rrd->live_head->last_up_usec) / 1e6f; /* adjust usecs */
1227         *post_int = occu_pdp_age;   /* how much after it */
1228         *post_int += ((double) current_time_usec) / 1e6f;   /* adjust usecs */
1229     } else {
1230         *pre_int = interval;
1231         *post_int = 0;
1232     }
1234     *proc_pdp_cnt = proc_pdp_st / rrd->stat_head->pdp_step;
1236 #ifdef DEBUG
1237     printf("proc_pdp_age %lu\t"
1238            "proc_pdp_st %lu\t"
1239            "occu_pfp_age %lu\t"
1240            "occu_pdp_st %lu\t"
1241            "int %lf\t"
1242            "pre_int %lf\t"
1243            "post_int %lf\n", proc_pdp_age, proc_pdp_st,
1244            occu_pdp_age, occu_pdp_st, interval, *pre_int, *post_int);
1245 #endif
1247     /* compute the number of elapsed pdp_st moments */
1248     return (occu_pdp_st - proc_pdp_st) / rrd->stat_head->pdp_step;
1251 /*
1252  * Increment the PDP values by the values in pdp_new, or else initialize them.
1253  */
1254 static void simple_update(
1255     rrd_t *rrd,
1256     double interval,
1257     rrd_value_t *pdp_new)
1259     int       i;
1261     for (i = 0; i < (signed) rrd->stat_head->ds_cnt; i++) {
1262         if (isnan(pdp_new[i])) {
1263             /* this is not really accurate if we use subsecond data arrival time
1264                should have thought of it when going subsecond resolution ...
1265                sorry next format change we will have it! */
1266             rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
1267                 floor(interval);
1268         } else {
1269             if (isnan(rrd->pdp_prep[i].scratch[PDP_val].u_val)) {
1270                 rrd->pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
1271             } else {
1272                 rrd->pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
1273             }
1274         }
1275 #ifdef DEBUG
1276         fprintf(stderr,
1277                 "NO PDP  ds[%i]\t"
1278                 "value %10.2f\t"
1279                 "unkn_sec %5lu\n",
1280                 i,
1281                 rrd->pdp_prep[i].scratch[PDP_val].u_val,
1282                 rrd->pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
1283 #endif
1284     }
1287 /*
1288  * Call process_pdp_st for each DS.
1289  *
1290  * Returns 0 on success, -1 on error.
1291  */
1292 static int process_all_pdp_st(
1293     rrd_t *rrd,
1294     double interval,
1295     double pre_int,
1296     double post_int,
1297     unsigned long elapsed_pdp_st,
1298     rrd_value_t *pdp_new,
1299     rrd_value_t *pdp_temp)
1301     unsigned long ds_idx;
1303     /* in pdp_prep[].scratch[PDP_val].u_val we have collected
1304        rate*seconds which occurred up to the last run.
1305        pdp_new[] contains rate*seconds from the latest run.
1306        pdp_temp[] will contain the rate for cdp */
1308     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1309         if (process_pdp_st(rrd, ds_idx, interval, pre_int, post_int,
1310                            elapsed_pdp_st * rrd->stat_head->pdp_step,
1311                            pdp_new, pdp_temp) == -1) {
1312             return -1;
1313         }
1314 #ifdef DEBUG
1315         fprintf(stderr, "PDP UPD ds[%lu]\t"
1316                 "elapsed_pdp_st %lu\t"
1317                 "pdp_temp %10.2f\t"
1318                 "new_prep %10.2f\t"
1319                 "new_unkn_sec %5lu\n",
1320                 ds_idx,
1321                 elapsed_pdp_st,
1322                 pdp_temp[ds_idx],
1323                 rrd->pdp_prep[ds_idx].scratch[PDP_val].u_val,
1324                 rrd->pdp_prep[ds_idx].scratch[PDP_unkn_sec_cnt].u_cnt);
1325 #endif
1326     }
1327     return 0;
1330 /*
1331  * Process an update that occurs after one of the PDP moments.
1332  * Increments the PDP value, sets NAN if time greater than the
1333  * heartbeats have elapsed, processes CDEFs.
1334  *
1335  * Returns 0 on success, -1 on error.
1336  */
1337 static int process_pdp_st(
1338     rrd_t *rrd,
1339     unsigned long ds_idx,
1340     double interval,
1341     double pre_int,
1342     double post_int,
1343     long diff_pdp_st,   /* number of seconds in full steps passed since last update */
1344     rrd_value_t *pdp_new,
1345     rrd_value_t *pdp_temp)
1347     int       i;
1349     /* update pdp_prep to the current pdp_st. */
1350     double    pre_unknown = 0.0;
1351     unival   *scratch = rrd->pdp_prep[ds_idx].scratch;
1352     unsigned long mrhb = rrd->ds_def[ds_idx].par[DS_mrhb_cnt].u_cnt;
1354     rpnstack_t rpnstack;    /* used for COMPUTE DS */
1356     rpnstack_init(&rpnstack);
1359     if (isnan(pdp_new[ds_idx])) {
1360         /* a final bit of unknown to be added before calculation
1361            we use a temporary variable for this so that we
1362            don't have to turn integer lines before using the value */
1363         pre_unknown = pre_int;
1364     } else {
1365         if (isnan(scratch[PDP_val].u_val)) {
1366             scratch[PDP_val].u_val = 0;
1367         }
1368         scratch[PDP_val].u_val += pdp_new[ds_idx] / interval * pre_int;
1369     }
1371     /* if too much of the pdp_prep is unknown we dump it */
1372     /* if the interval is larger thatn mrhb we get NAN */
1373     if ((interval > mrhb) ||
1374         (rrd->stat_head->pdp_step / 2.0 <
1375          (signed) scratch[PDP_unkn_sec_cnt].u_cnt)) {
1376         pdp_temp[ds_idx] = DNAN;
1377     } else {
1378         pdp_temp[ds_idx] = scratch[PDP_val].u_val /
1379             ((double) (diff_pdp_st - scratch[PDP_unkn_sec_cnt].u_cnt) -
1380              pre_unknown);
1381     }
1383     /* process CDEF data sources; remember each CDEF DS can
1384      * only reference other DS with a lower index number */
1385     if (dst_conv(rrd->ds_def[ds_idx].dst) == DST_CDEF) {
1386         rpnp_t   *rpnp;
1388         rpnp =
1389             rpn_expand((rpn_cdefds_t *) &(rrd->ds_def[ds_idx].par[DS_cdef]));
1390         /* substitute data values for OP_VARIABLE nodes */
1391         for (i = 0; rpnp[i].op != OP_END; i++) {
1392             if (rpnp[i].op == OP_VARIABLE) {
1393                 rpnp[i].op = OP_NUMBER;
1394                 rpnp[i].val = pdp_temp[rpnp[i].ptr];
1395             }
1396         }
1397         /* run the rpn calculator */
1398         if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, ds_idx) == -1) {
1399             free(rpnp);
1400             rpnstack_free(&rpnstack);
1401             return -1;
1402         }
1403     }
1405     /* make pdp_prep ready for the next run */
1406     if (isnan(pdp_new[ds_idx])) {
1407         /* this is not realy accurate if we use subsecond data arival time
1408            should have thought of it when going subsecond resolution ...
1409            sorry next format change we will have it! */
1410         scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
1411         scratch[PDP_val].u_val = DNAN;
1412     } else {
1413         scratch[PDP_unkn_sec_cnt].u_cnt = 0;
1414         scratch[PDP_val].u_val = pdp_new[ds_idx] / interval * post_int;
1415     }
1416     rpnstack_free(&rpnstack);
1417     return 0;
1420 /*
1421  * Iterate over all the RRAs for a given DS and:
1422  * 1. Decide whether to schedule a smooth later
1423  * 2. Decide whether to skip updating SEASONAL and DEVSEASONAL
1424  * 3. Update the CDP
1425  *
1426  * Returns 0 on success, -1 on error
1427  */
1428 static int update_all_cdp_prep(
1429     rrd_t *rrd,
1430     unsigned long *rra_step_cnt,
1431     unsigned long rra_begin,
1432     rrd_file_t *rrd_file,
1433     unsigned long elapsed_pdp_st,
1434     unsigned long proc_pdp_cnt,
1435     rrd_value_t **last_seasonal_coef,
1436     rrd_value_t **seasonal_coef,
1437     rrd_value_t *pdp_temp,
1438     unsigned long *skip_update,
1439     int *schedule_smooth)
1441     unsigned long rra_idx;
1443     /* index into the CDP scratch array */
1444     enum cf_en current_cf;
1445     unsigned long rra_start;
1447     /* number of rows to be updated in an RRA for a data value. */
1448     unsigned long start_pdp_offset;
1450     rra_start = rra_begin;
1451     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1452         current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1453         start_pdp_offset =
1454             rrd->rra_def[rra_idx].pdp_cnt -
1455             proc_pdp_cnt % rrd->rra_def[rra_idx].pdp_cnt;
1456         skip_update[rra_idx] = 0;
1457         if (start_pdp_offset <= elapsed_pdp_st) {
1458             rra_step_cnt[rra_idx] = (elapsed_pdp_st - start_pdp_offset) /
1459                 rrd->rra_def[rra_idx].pdp_cnt + 1;
1460         } else {
1461             rra_step_cnt[rra_idx] = 0;
1462         }
1464         if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1465             /* If this is a bulk update, we need to skip ahead in the seasonal arrays
1466              * so that they will be correct for the next observed value; note that for
1467              * the bulk update itself, no update will occur to DEVSEASONAL or SEASONAL;
1468              * futhermore, HWPREDICT and DEVPREDICT will be set to DNAN. */
1469             if (rra_step_cnt[rra_idx] > 1) {
1470                 skip_update[rra_idx] = 1;
1471                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1472                                 elapsed_pdp_st, last_seasonal_coef);
1473                 lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1474                                 elapsed_pdp_st + 1, seasonal_coef);
1475             }
1476             /* periodically run a smoother for seasonal effects */
1477             if (do_schedule_smooth(rrd, rra_idx, elapsed_pdp_st)) {
1478 #ifdef DEBUG
1479                 fprintf(stderr,
1480                         "schedule_smooth: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1481                         rrd->rra_ptr[rra_idx].cur_row, elapsed_pdp_st,
1482                         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].
1483                         u_cnt);
1484 #endif
1485                 *schedule_smooth = 1;
1486             }
1487         }
1488         if (rrd_test_error())
1489             return -1;
1491         if (update_cdp_prep
1492             (rrd, elapsed_pdp_st, start_pdp_offset, rra_step_cnt, rra_idx,
1493              pdp_temp, *last_seasonal_coef, *seasonal_coef,
1494              current_cf) == -1) {
1495             return -1;
1496         }
1497         rra_start +=
1498             rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
1499             sizeof(rrd_value_t);
1500     }
1501     return 0;
1504 /* 
1505  * Are we due for a smooth? Also increments our position in the burn-in cycle.
1506  */
1507 static int do_schedule_smooth(
1508     rrd_t *rrd,
1509     unsigned long rra_idx,
1510     unsigned long elapsed_pdp_st)
1512     unsigned long cdp_idx = rra_idx * (rrd->stat_head->ds_cnt);
1513     unsigned long cur_row = rrd->rra_ptr[rra_idx].cur_row;
1514     unsigned long row_cnt = rrd->rra_def[rra_idx].row_cnt;
1515     unsigned long seasonal_smooth_idx =
1516         rrd->rra_def[rra_idx].par[RRA_seasonal_smooth_idx].u_cnt;
1517     unsigned long *init_seasonal =
1518         &(rrd->cdp_prep[cdp_idx].scratch[CDP_init_seasonal].u_cnt);
1520     /* Need to use first cdp parameter buffer to track burnin (burnin requires
1521      * a specific smoothing schedule).  The CDP_init_seasonal parameter is
1522      * really an RRA level, not a data source within RRA level parameter, but
1523      * the rra_def is read only for rrd_update (not flushed to disk). */
1524     if (*init_seasonal > BURNIN_CYCLES) {
1525         /* someone has no doubt invented a trick to deal with this wrap around,
1526          * but at least this code is clear. */
1527         if (seasonal_smooth_idx > cur_row) {
1528             /* here elapsed_pdp_st = rra_step_cnt[rra_idx] because of 1-1 mapping
1529              * between PDP and CDP */
1530             return (cur_row + elapsed_pdp_st >= seasonal_smooth_idx);
1531         }
1532         /* can't rely on negative numbers because we are working with
1533          * unsigned values */
1534         return (cur_row + elapsed_pdp_st >= row_cnt
1535                 && cur_row + elapsed_pdp_st >= row_cnt + seasonal_smooth_idx);
1536     }
1537     /* mark off one of the burn-in cycles */
1538     return (cur_row + elapsed_pdp_st >= row_cnt && ++(*init_seasonal));
1541 /*
1542  * For a given RRA, iterate over the data sources and call the appropriate
1543  * consolidation function.
1544  *
1545  * Returns 0 on success, -1 on error.
1546  */
1547 static int update_cdp_prep(
1548     rrd_t *rrd,
1549     unsigned long elapsed_pdp_st,
1550     unsigned long start_pdp_offset,
1551     unsigned long *rra_step_cnt,
1552     int rra_idx,
1553     rrd_value_t *pdp_temp,
1554     rrd_value_t *last_seasonal_coef,
1555     rrd_value_t *seasonal_coef,
1556     int current_cf)
1558     unsigned long ds_idx, cdp_idx;
1560     /* update CDP_PREP areas */
1561     /* loop over data soures within each RRA */
1562     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1564         cdp_idx = rra_idx * rrd->stat_head->ds_cnt + ds_idx;
1566         if (rrd->rra_def[rra_idx].pdp_cnt > 1) {
1567             update_cdp(rrd->cdp_prep[cdp_idx].scratch, current_cf,
1568                        pdp_temp[ds_idx], rra_step_cnt[rra_idx],
1569                        elapsed_pdp_st, start_pdp_offset,
1570                        rrd->rra_def[rra_idx].pdp_cnt,
1571                        rrd->rra_def[rra_idx].par[RRA_cdp_xff_val].u_val,
1572                        rra_idx, ds_idx);
1573         } else {
1574             /* Nothing to consolidate if there's one PDP per CDP. However, if
1575              * we've missed some PDPs, let's update null counters etc. */
1576             if (elapsed_pdp_st > 2) {
1577                 reset_cdp(rrd, elapsed_pdp_st, pdp_temp, last_seasonal_coef,
1578                           seasonal_coef, rra_idx, ds_idx, cdp_idx,
1579                           current_cf);
1580             }
1581         }
1583         if (rrd_test_error())
1584             return -1;
1585     }                   /* endif data sources loop */
1586     return 0;
1589 /*
1590  * Given the new reading (pdp_temp_val), update or initialize the CDP value,
1591  * primary value, secondary value, and # of unknowns.
1592  */
1593 static void update_cdp(
1594     unival *scratch,
1595     int current_cf,
1596     rrd_value_t pdp_temp_val,
1597     unsigned long rra_step_cnt,
1598     unsigned long elapsed_pdp_st,
1599     unsigned long start_pdp_offset,
1600     unsigned long pdp_cnt,
1601     rrd_value_t xff,
1602     int i,
1603     int ii)
1605     /* shorthand variables */
1606     rrd_value_t *cdp_val = &scratch[CDP_val].u_val;
1607     rrd_value_t *cdp_primary_val = &scratch[CDP_primary_val].u_val;
1608     rrd_value_t *cdp_secondary_val = &scratch[CDP_secondary_val].u_val;
1609     unsigned long *cdp_unkn_pdp_cnt = &scratch[CDP_unkn_pdp_cnt].u_cnt;
1611     if (rra_step_cnt) {
1612         /* If we are in this block, as least 1 CDP value will be written to
1613          * disk, this is the CDP_primary_val entry. If more than 1 value needs
1614          * to be written, then the "fill in" value is the CDP_secondary_val
1615          * entry. */
1616         if (isnan(pdp_temp_val)) {
1617             *cdp_unkn_pdp_cnt += start_pdp_offset;
1618             *cdp_secondary_val = DNAN;
1619         } else {
1620             /* CDP_secondary value is the RRA "fill in" value for intermediary
1621              * CDP data entries. No matter the CF, the value is the same because
1622              * the average, max, min, and last of a list of identical values is
1623              * the same, namely, the value itself. */
1624             *cdp_secondary_val = pdp_temp_val;
1625         }
1627         if (*cdp_unkn_pdp_cnt > pdp_cnt * xff) {
1628             *cdp_primary_val = DNAN;
1629             if (current_cf == CF_AVERAGE) {
1630                 *cdp_val =
1631                     initialize_average_carry_over(pdp_temp_val,
1632                                                   elapsed_pdp_st,
1633                                                   start_pdp_offset, pdp_cnt);
1634             } else {
1635                 *cdp_val = pdp_temp_val;
1636             }
1637         } else {
1638             initialize_cdp_val(scratch, current_cf, pdp_temp_val,
1639                                elapsed_pdp_st, start_pdp_offset, pdp_cnt);
1640         }               /* endif meets xff value requirement for a valid value */
1641         /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1642          * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1643         if (isnan(pdp_temp_val))
1644             *cdp_unkn_pdp_cnt = (elapsed_pdp_st - start_pdp_offset) % pdp_cnt;
1645         else
1646             *cdp_unkn_pdp_cnt = 0;
1647     } else {            /* rra_step_cnt[i]  == 0 */
1649 #ifdef DEBUG
1650         if (isnan(*cdp_val)) {
1651             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, DNAN\n",
1652                     i, ii);
1653         } else {
1654             fprintf(stderr, "schedule CDP_val update, RRA %d DS %d, %10.2f\n",
1655                     i, ii, *cdp_val);
1656         }
1657 #endif
1658         if (isnan(pdp_temp_val)) {
1659             *cdp_unkn_pdp_cnt += elapsed_pdp_st;
1660         } else {
1661             *cdp_val =
1662                 calculate_cdp_val(*cdp_val, pdp_temp_val, elapsed_pdp_st,
1663                                   current_cf, i, ii);
1664         }
1665     }
1668 /*
1669  * Set the CDP_primary_val and CDP_val to the appropriate initial value based
1670  * on the type of consolidation function.
1671  */
1672 static void initialize_cdp_val(
1673     unival *scratch,
1674     int current_cf,
1675     rrd_value_t pdp_temp_val,
1676     unsigned long elapsed_pdp_st,
1677     unsigned long start_pdp_offset,
1678     unsigned long pdp_cnt)
1680     rrd_value_t cum_val, cur_val;
1682     switch (current_cf) {
1683     case CF_AVERAGE:
1684         cum_val = IFDNAN(scratch[CDP_val].u_val, 0.0);
1685         cur_val = IFDNAN(pdp_temp_val, 0.0);
1686         scratch[CDP_primary_val].u_val =
1687             (cum_val + cur_val * start_pdp_offset) /
1688             (pdp_cnt - scratch[CDP_unkn_pdp_cnt].u_cnt);
1689         scratch[CDP_val].u_val =
1690             initialize_average_carry_over(pdp_temp_val, elapsed_pdp_st,
1691                                           start_pdp_offset, pdp_cnt);
1692         break;
1693     case CF_MAXIMUM:
1694         cum_val = IFDNAN(scratch[CDP_val].u_val, -DINF);
1695         cur_val = IFDNAN(pdp_temp_val, -DINF);
1696 #if 0
1697 #ifdef DEBUG
1698         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1699             fprintf(stderr,
1700                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1701                     i, ii);
1702             exit(-1);
1703         }
1704 #endif
1705 #endif
1706         if (cur_val > cum_val)
1707             scratch[CDP_primary_val].u_val = cur_val;
1708         else
1709             scratch[CDP_primary_val].u_val = cum_val;
1710         /* initialize carry over value */
1711         scratch[CDP_val].u_val = pdp_temp_val;
1712         break;
1713     case CF_MINIMUM:
1714         cum_val = IFDNAN(scratch[CDP_val].u_val, DINF);
1715         cur_val = IFDNAN(pdp_temp_val, DINF);
1716 #if 0
1717 #ifdef DEBUG
1718         if (isnan(scratch[CDP_val].u_val) && isnan(pdp_temp)) {
1719             fprintf(stderr,
1720                     "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!", i,
1721                     ii);
1722             exit(-1);
1723         }
1724 #endif
1725 #endif
1726         if (cur_val < cum_val)
1727             scratch[CDP_primary_val].u_val = cur_val;
1728         else
1729             scratch[CDP_primary_val].u_val = cum_val;
1730         /* initialize carry over value */
1731         scratch[CDP_val].u_val = pdp_temp_val;
1732         break;
1733     case CF_LAST:
1734     default:
1735         scratch[CDP_primary_val].u_val = pdp_temp_val;
1736         /* initialize carry over value */
1737         scratch[CDP_val].u_val = pdp_temp_val;
1738         break;
1739     }
1742 /*
1743  * Update the consolidation function for Holt-Winters functions as
1744  * well as other functions that don't actually consolidate multiple
1745  * PDPs.
1746  */
1747 static void reset_cdp(
1748     rrd_t *rrd,
1749     unsigned long elapsed_pdp_st,
1750     rrd_value_t *pdp_temp,
1751     rrd_value_t *last_seasonal_coef,
1752     rrd_value_t *seasonal_coef,
1753     int rra_idx,
1754     int ds_idx,
1755     int cdp_idx,
1756     enum cf_en current_cf)
1758     unival   *scratch = rrd->cdp_prep[cdp_idx].scratch;
1760     switch (current_cf) {
1761     case CF_AVERAGE:
1762     default:
1763         scratch[CDP_primary_val].u_val = pdp_temp[ds_idx];
1764         scratch[CDP_secondary_val].u_val = pdp_temp[ds_idx];
1765         break;
1766     case CF_SEASONAL:
1767     case CF_DEVSEASONAL:
1768         /* need to update cached seasonal values, so they are consistent
1769          * with the bulk update */
1770         /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1771          * CDP_last_deviation are the same. */
1772         scratch[CDP_hw_last_seasonal].u_val = last_seasonal_coef[ds_idx];
1773         scratch[CDP_hw_seasonal].u_val = seasonal_coef[ds_idx];
1774         break;
1775     case CF_HWPREDICT:
1776     case CF_MHWPREDICT:
1777         /* need to update the null_count and last_null_count.
1778          * even do this for non-DNAN pdp_temp because the
1779          * algorithm is not learning from batch updates. */
1780         scratch[CDP_null_count].u_cnt += elapsed_pdp_st;
1781         scratch[CDP_last_null_count].u_cnt += elapsed_pdp_st - 1;
1782         /* fall through */
1783     case CF_DEVPREDICT:
1784         scratch[CDP_primary_val].u_val = DNAN;
1785         scratch[CDP_secondary_val].u_val = DNAN;
1786         break;
1787     case CF_FAILURES:
1788         /* do not count missed bulk values as failures */
1789         scratch[CDP_primary_val].u_val = 0;
1790         scratch[CDP_secondary_val].u_val = 0;
1791         /* need to reset violations buffer.
1792          * could do this more carefully, but for now, just
1793          * assume a bulk update wipes away all violations. */
1794         erase_violations(rrd, cdp_idx, rra_idx);
1795         break;
1796     }
1799 static rrd_value_t initialize_average_carry_over(
1800     rrd_value_t pdp_temp_val,
1801     unsigned long elapsed_pdp_st,
1802     unsigned long start_pdp_offset,
1803     unsigned long pdp_cnt)
1805     /* initialize carry over value */
1806     if (isnan(pdp_temp_val)) {
1807         return DNAN;
1808     }
1809     return pdp_temp_val * ((elapsed_pdp_st - start_pdp_offset) % pdp_cnt);
1812 /*
1813  * Update or initialize a CDP value based on the consolidation
1814  * function.
1815  *
1816  * Returns the new value.
1817  */
1818 static rrd_value_t calculate_cdp_val(
1819     rrd_value_t cdp_val,
1820     rrd_value_t pdp_temp_val,
1821     unsigned long elapsed_pdp_st,
1822     int current_cf,
1823 #ifdef DEBUG
1824     int i,
1825     int ii
1826 #else
1827     int UNUSED(i),
1828     int UNUSED(ii)
1829 #endif
1830     )
1832     if (isnan(cdp_val)) {
1833         if (current_cf == CF_AVERAGE) {
1834             pdp_temp_val *= elapsed_pdp_st;
1835         }
1836 #ifdef DEBUG
1837         fprintf(stderr, "Initialize CDP_val for RRA %d DS %d: %10.2f\n",
1838                 i, ii, pdp_temp_val);
1839 #endif
1840         return pdp_temp_val;
1841     }
1842     if (current_cf == CF_AVERAGE)
1843         return cdp_val + pdp_temp_val * elapsed_pdp_st;
1844     if (current_cf == CF_MINIMUM)
1845         return (pdp_temp_val < cdp_val) ? pdp_temp_val : cdp_val;
1846     if (current_cf == CF_MAXIMUM)
1847         return (pdp_temp_val > cdp_val) ? pdp_temp_val : cdp_val;
1849     return pdp_temp_val;
1852 /*
1853  * For each RRA, update the seasonal values and then call update_aberrant_CF
1854  * for each data source.
1855  *
1856  * Return 0 on success, -1 on error.
1857  */
1858 static int update_aberrant_cdps(
1859     rrd_t *rrd,
1860     rrd_file_t *rrd_file,
1861     unsigned long rra_begin,
1862     unsigned long elapsed_pdp_st,
1863     rrd_value_t *pdp_temp,
1864     rrd_value_t **seasonal_coef)
1866     unsigned long rra_idx, ds_idx, j;
1868     /* number of PDP steps since the last update that
1869      * are assigned to the first CDP to be generated
1870      * since the last update. */
1871     unsigned short scratch_idx;
1872     unsigned long rra_start;
1873     enum cf_en current_cf;
1875     /* this loop is only entered if elapsed_pdp_st < 3 */
1876     for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1877          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1878         rra_start = rra_begin;
1879         for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1880             if (rrd->rra_def[rra_idx].pdp_cnt == 1) {
1881                 current_cf = cf_conv(rrd->rra_def[rra_idx].cf_nam);
1882                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
1883                     if (scratch_idx == CDP_primary_val) {
1884                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1885                                         elapsed_pdp_st + 1, seasonal_coef);
1886                     } else {
1887                         lookup_seasonal(rrd, rra_idx, rra_start, rrd_file,
1888                                         elapsed_pdp_st + 2, seasonal_coef);
1889                     }
1890                 }
1891                 if (rrd_test_error())
1892                     return -1;
1893                 /* loop over data soures within each RRA */
1894                 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1895                     update_aberrant_CF(rrd, pdp_temp[ds_idx], current_cf,
1896                                        rra_idx * (rrd->stat_head->ds_cnt) +
1897                                        ds_idx, rra_idx, ds_idx, scratch_idx,
1898                                        *seasonal_coef);
1899                 }
1900             }
1901             rra_start += rrd->rra_def[rra_idx].row_cnt
1902                 * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
1903         }
1904     }
1905     return 0;
1908 /* 
1909  * Move sequentially through the file, writing one RRA at a time.  Note this
1910  * architecture divorces the computation of CDP with flushing updated RRA
1911  * entries to disk.
1912  *
1913  * Return 0 on success, -1 on error.
1914  */
1915 static int write_to_rras(
1916     rrd_t *rrd,
1917     rrd_file_t *rrd_file,
1918     unsigned long *rra_step_cnt,
1919     unsigned long rra_begin,
1920     time_t current_time,
1921     unsigned long *skip_update,
1922     rrd_info_t ** pcdp_summary)
1924     unsigned long rra_idx;
1925     unsigned long rra_start;
1926     time_t    rra_time = 0; /* time of update for a RRA */
1928     unsigned long ds_cnt = rrd->stat_head->ds_cnt;
1929     
1930     /* Ready to write to disk */
1931     rra_start = rra_begin;
1933     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; rra_idx++) {
1934         rra_def_t *rra_def = &rrd->rra_def[rra_idx];
1935         rra_ptr_t *rra_ptr = &rrd->rra_ptr[rra_idx];
1937         /* for cdp_prep */
1938         unsigned short scratch_idx;
1939         unsigned long step_subtract;
1941         for (scratch_idx = CDP_primary_val,
1942                  step_subtract = 1;
1943              rra_step_cnt[rra_idx] > 0;
1944              rra_step_cnt[rra_idx]--,
1945                  scratch_idx = CDP_secondary_val,
1946                  step_subtract = 2) {
1948             off_t rra_pos_new;
1949 #ifdef DEBUG
1950             fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1951 #endif
1952             /* increment, with wrap-around */
1953             if (++rra_ptr->cur_row >= rra_def->row_cnt)
1954               rra_ptr->cur_row = 0;
1956             /* we know what our position should be */
1957             rra_pos_new = rra_start
1958               + ds_cnt * rra_ptr->cur_row * sizeof(rrd_value_t);
1960             /* re-seek if the position is wrong or we wrapped around */
1961             if (rra_pos_new != rrd_file->pos) {
1962                 if (rrd_seek(rrd_file, rra_pos_new, SEEK_SET) != 0) {
1963                     rrd_set_error("seek error in rrd");
1964                     return -1;
1965                 }
1966             }
1967 #ifdef DEBUG
1968             fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1969 #endif
1971             if (skip_update[rra_idx])
1972                 continue;
1974             if (*pcdp_summary != NULL) {
1975                 unsigned long step_time = rra_def->pdp_cnt * rrd->stat_head->pdp_step;
1977                 rra_time = (current_time - current_time % step_time)
1978                     - ((rra_step_cnt[rra_idx] - step_subtract) * step_time);
1979             }
1981             if (write_RRA_row
1982                 (rrd_file, rrd, rra_idx, scratch_idx,
1983                  pcdp_summary, rra_time) == -1)
1984                 return -1;
1985         }
1987         rra_start += rra_def->row_cnt * ds_cnt * sizeof(rrd_value_t);
1988     } /* RRA LOOP */
1990     return 0;
1993 /*
1994  * Write out one row of values (one value per DS) to the archive.
1995  *
1996  * Returns 0 on success, -1 on error.
1997  */
1998 static int write_RRA_row(
1999     rrd_file_t *rrd_file,
2000     rrd_t *rrd,
2001     unsigned long rra_idx,
2002     unsigned short CDP_scratch_idx,
2003     rrd_info_t ** pcdp_summary,
2004     time_t rra_time)
2006     unsigned long ds_idx, cdp_idx;
2007     rrd_infoval_t iv;
2009     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
2010         /* compute the cdp index */
2011         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
2012 #ifdef DEBUG
2013         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
2014                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
2015                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
2016 #endif
2017         if (*pcdp_summary != NULL) {
2018             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
2019             /* append info to the return hash */
2020             *pcdp_summary = rrd_info_push(*pcdp_summary,
2021                                           sprintf_alloc
2022                                           ("[%d]RRA[%s][%lu]DS[%s]", rra_time,
2023                                            rrd->rra_def[rra_idx].cf_nam,
2024                                            rrd->rra_def[rra_idx].pdp_cnt,
2025                                            rrd->ds_def[ds_idx].ds_nam),
2026                                           RD_I_VAL, iv);
2027         }
2028         if (rrd_write(rrd_file,
2029                       &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].
2030                         u_val), sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
2031             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
2032             return -1;
2033         }
2034     }
2035     return 0;
2038 /*
2039  * Call apply_smoother for all DEVSEASONAL and SEASONAL RRAs.
2040  *
2041  * Returns 0 on success, -1 otherwise
2042  */
2043 static int smooth_all_rras(
2044     rrd_t *rrd,
2045     rrd_file_t *rrd_file,
2046     unsigned long rra_begin)
2048     unsigned long rra_start = rra_begin;
2049     unsigned long rra_idx;
2051     for (rra_idx = 0; rra_idx < rrd->stat_head->rra_cnt; ++rra_idx) {
2052         if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_DEVSEASONAL ||
2053             cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
2054 #ifdef DEBUG
2055             fprintf(stderr, "Running smoother for rra %lu\n", rra_idx);
2056 #endif
2057             apply_smoother(rrd, rra_idx, rra_start, rrd_file);
2058             if (rrd_test_error())
2059                 return -1;
2060         }
2061         rra_start += rrd->rra_def[rra_idx].row_cnt
2062             * rrd->stat_head->ds_cnt * sizeof(rrd_value_t);
2063     }
2064     return 0;
2067 #ifndef HAVE_MMAP
2068 /*
2069  * Flush changes to disk (unless we're using mmap)
2070  *
2071  * Returns 0 on success, -1 otherwise
2072  */
2073 static int write_changes_to_disk(
2074     rrd_t *rrd,
2075     rrd_file_t *rrd_file,
2076     int version)
2078     /* we just need to write back the live header portion now */
2079     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
2080                             + sizeof(ds_def_t) * rrd->stat_head->ds_cnt
2081                             + sizeof(rra_def_t) * rrd->stat_head->rra_cnt),
2082                  SEEK_SET) != 0) {
2083         rrd_set_error("seek rrd for live header writeback");
2084         return -1;
2085     }
2086     if (version >= 3) {
2087         if (rrd_write(rrd_file, rrd->live_head,
2088                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
2089             rrd_set_error("rrd_write live_head to rrd");
2090             return -1;
2091         }
2092     } else {
2093         if (rrd_write(rrd_file, rrd->legacy_last_up,
2094                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
2095             rrd_set_error("rrd_write live_head to rrd");
2096             return -1;
2097         }
2098     }
2101     if (rrd_write(rrd_file, rrd->pdp_prep,
2102                   sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)
2103         != (ssize_t) (sizeof(pdp_prep_t) * rrd->stat_head->ds_cnt)) {
2104         rrd_set_error("rrd_write pdp_prep to rrd");
2105         return -1;
2106     }
2108     if (rrd_write(rrd_file, rrd->cdp_prep,
2109                   sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2110                   rrd->stat_head->ds_cnt)
2111         != (ssize_t) (sizeof(cdp_prep_t) * rrd->stat_head->rra_cnt *
2112                       rrd->stat_head->ds_cnt)) {
2114         rrd_set_error("rrd_write cdp_prep to rrd");
2115         return -1;
2116     }
2118     if (rrd_write(rrd_file, rrd->rra_ptr,
2119                   sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)
2120         != (ssize_t) (sizeof(rra_ptr_t) * rrd->stat_head->rra_cnt)) {
2121         rrd_set_error("rrd_write rra_ptr to rrd");
2122         return -1;
2123     }
2124     return 0;
2126 #endif