Code

From Bernhard Fischer
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
26  * replacement.
27  */
28 #include <sys/timeb.h>
30 #ifndef __MINGW32__
31 struct timeval {
32     time_t    tv_sec;   /* seconds */
33     long      tv_usec;  /* microseconds */
34 };
35 #endif
37 struct __timezone {
38     int       tz_minuteswest;   /* minutes W of Greenwich */
39     int       tz_dsttime;   /* type of dst correction */
40 };
42 static int gettimeofday(
43     struct timeval *t,
44     struct __timezone *tz)
45 {
47     struct _timeb current_time;
49     _ftime(&current_time);
51     t->tv_sec = current_time.time;
52     t->tv_usec = current_time.millitm * 1000;
54     return 0;
55 }
57 #endif
58 /*
59  * normalize time as returned by gettimeofday. usec part must
60  * be always >= 0
61  */
62 static inline void normalize_time(
63     struct timeval *t)
64 {
65     if (t->tv_usec < 0) {
66         t->tv_sec--;
67         t->tv_usec += 1000000L;
68     }
69 }
71 static info_t *write_RRA_row(
72     rrd_file_t *rrd_file,
73     rrd_t *rrd,
74     unsigned long rra_idx,
75     unsigned long *rra_current,
76     unsigned short CDP_scratch_idx,
77     info_t *pcdp_summary,
78     time_t *rra_time)
79 {
80     unsigned long ds_idx, cdp_idx;
81     infoval   iv;
83     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84         /* compute the cdp index */
85         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91         if (pcdp_summary != NULL) {
92             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93             /* append info to the return hash */
94             pcdp_summary = info_push(pcdp_summary,
95                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96                                                    *rra_time,
97                                                    rrd->rra_def[rra_idx].
98                                                    cf_nam,
99                                                    rrd->rra_def[rra_idx].
100                                                    pdp_cnt,
101                                                    rrd->ds_def[ds_idx].
102                                                    ds_nam), RD_I_VAL, iv);
103         }
104         if (rrd_write
105             (rrd_file,
106              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107              sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
108             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109             return 0;
110         }
111         *rra_current += sizeof(rrd_value_t);
112     }
113     return (pcdp_summary);
116 int       rrd_update_r(
117     const char *filename,
118     const char *tmplt,
119     int argc,
120     const char **argv);
121 int       _rrd_update(
122     const char *filename,
123     const char *tmplt,
124     int argc,
125     const char **argv,
126     info_t *);
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t   *rrd_update_v(
132     int argc,
133     char **argv)
135     char     *tmplt = NULL;
136     info_t   *result = NULL;
137     infoval   rc;
139     rc.u_int = -1;
140     optind = 0;
141     opterr = 0;         /* initialize getopt */
143     while (1) {
144         static struct option long_options[] = {
145             {"template", required_argument, 0, 't'},
146             {0, 0, 0, 0}
147         };
148         int       option_index = 0;
149         int       opt;
151         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
153         if (opt == EOF)
154             break;
156         switch (opt) {
157         case 't':
158             tmplt = optarg;
159             break;
161         case '?':
162             rrd_set_error("unknown option '%s'", argv[optind - 1]);
163             goto end_tag;
164         }
165     }
167     /* need at least 2 arguments: filename, data. */
168     if (argc - optind < 2) {
169         rrd_set_error("Not enough arguments");
170         goto end_tag;
171     }
172     rc.u_int = 0;
173     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174     rc.u_int = _rrd_update(argv[optind], tmplt,
175                            argc - optind - 1,
176                            (const char **) (argv + optind + 1), result);
177     result->value.u_int = rc.u_int;
178   end_tag:
179     return result;
182 int rrd_update(
183     int argc,
184     char **argv)
186     char     *tmplt = NULL;
187     int       rc;
189     optind = 0;
190     opterr = 0;         /* initialize getopt */
192     while (1) {
193         static struct option long_options[] = {
194             {"template", required_argument, 0, 't'},
195             {0, 0, 0, 0}
196         };
197         int       option_index = 0;
198         int       opt;
200         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
202         if (opt == EOF)
203             break;
205         switch (opt) {
206         case 't':
207             tmplt = optarg;
208             break;
210         case '?':
211             rrd_set_error("unknown option '%s'", argv[optind - 1]);
212             return (-1);
213         }
214     }
216     /* need at least 2 arguments: filename, data. */
217     if (argc - optind < 2) {
218         rrd_set_error("Not enough arguments");
220         return -1;
221     }
223     rc = rrd_update_r(argv[optind], tmplt,
224                       argc - optind - 1, (const char **) (argv + optind + 1));
225     return rc;
228 int rrd_update_r(
229     const char *filename,
230     const char *tmplt,
231     int argc,
232     const char **argv)
234     return _rrd_update(filename, tmplt, argc, argv, NULL);
237 int _rrd_update(
238     const char *filename,
239     const char *tmplt,
240     int argc,
241     const char **argv,
242     info_t *pcdp_summary)
245     int       arg_i = 2;
246     short     j;
247     unsigned long i, ii, iii = 1;
249     unsigned long rra_begin;    /* byte pointer to the rra
250                                  * area in the rrd file.  this
251                                  * pointer never changes value */
252     unsigned long rra_start;    /* byte pointer to the rra
253                                  * area in the rrd file.  this
254                                  * pointer changes as each rrd is
255                                  * processed. */
256     unsigned long rra_current;  /* byte pointer to the current write
257                                  * spot in the rrd file. */
258     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
259     double    interval, pre_int, post_int;  /* interval between this and
260                                              * the last run */
261     unsigned long proc_pdp_st;  /* which pdp_st was the last
262                                  * to be processed */
263     unsigned long occu_pdp_st;  /* when was the pdp_st
264                                  * before the last update
265                                  * time */
266     unsigned long proc_pdp_age; /* how old was the data in
267                                  * the pdp prep area when it
268                                  * was last updated */
269     unsigned long occu_pdp_age; /* how long ago was the last
270                                  * pdp_step time */
271     rrd_value_t *pdp_new;   /* prepare the incoming data
272                              * to be added the the
273                              * existing entry */
274     rrd_value_t *pdp_temp;  /* prepare the pdp values
275                              * to be added the the
276                              * cdp values */
278     long     *tmpl_idx; /* index representing the settings
279                            transported by the tmplt index */
280     unsigned long tmpl_cnt = 2; /* time and data */
282     rrd_t     rrd;
283     time_t    current_time = 0;
284     time_t    rra_time = 0; /* time of update for a RRA */
285     unsigned long current_time_usec = 0;    /* microseconds part of current time */
286     struct timeval tmp_time;    /* used for time conversion */
288     char    **updvals;
289     int       schedule_smooth = 0;
290     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292     /* a vector of future Holt-Winters seasonal coefs */
293     unsigned long elapsed_pdp_st;
295     /* number of elapsed PDP steps since last update */
296     unsigned long *rra_step_cnt = NULL;
298     /* number of rows to be updated in an RRA for a data
299      * value. */
300     unsigned long start_pdp_offset;
302     /* number of PDP steps since the last update that
303      * are assigned to the first CDP to be generated
304      * since the last update. */
305     unsigned short scratch_idx;
307     /* index into the CDP scratch array */
308     enum cf_en current_cf;
310     /* numeric id of the current consolidation function */
311     rpnstack_t rpnstack;    /* used for COMPUTE DS */
312     int       version;  /* rrd version */
313     char     *endptr;   /* used in the conversion */
314     rrd_file_t *rrd_file;
316     rpnstack_init(&rpnstack);
318     /* need at least 1 arguments: data. */
319     if (argc < 1) {
320         rrd_set_error("Not enough arguments");
321         goto err_out;
322     }
324     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325     if (rrd_file == NULL) {
326         goto err_free;
327     }
329     /* initialize time */
330     version = atoi(rrd.stat_head->version);
331     gettimeofday(&tmp_time, 0);
332     normalize_time(&tmp_time);
333     current_time = tmp_time.tv_sec;
334     if (version >= 3) {
335         current_time_usec = tmp_time.tv_usec;
336     } else {
337         current_time_usec = 0;
338     }
340     rra_current = rra_start = rra_begin = rrd_file->header_len;
341     /* This is defined in the ANSI C standard, section 7.9.5.3:
343        When a file is opened with udpate mode ('+' as the second
344        or third character in the ... list of mode argument
345        variables), both input and output may be performed on the
346        associated stream.  However, ...  input may not be directly
347        followed by output without an intervening call to a file
348        positioning function, unless the input operation encounters
349        end-of-file. */
350 #if 0                   //def HAVE_MMAP
351     rrd_filesize = rrd_file->file_size;
352     fseek(rrd_file->fd, 0, SEEK_END);
353     rrd_filesize = ftell(rrd_file->fd);
354     fseek(rrd_file->fd, rra_current, SEEK_SET);
355 #else
356 //    fseek(rrd_file->fd, 0, SEEK_CUR);
357 #endif
360     /* get exclusive lock to whole file.
361      * lock gets removed when we close the file.
362      */
363     if (LockRRD(rrd_file->fd) != 0) {
364         rrd_set_error("could not lock RRD");
365         goto err_close;
366     }
368     if ((updvals =
369          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
370         rrd_set_error("allocating updvals pointer array");
371         goto err_close;
372     }
374     if ((pdp_temp = malloc(sizeof(rrd_value_t)
375                            * rrd.stat_head->ds_cnt)) == NULL) {
376         rrd_set_error("allocating pdp_temp ...");
377         goto err_free_updvals;
378     }
380     if ((tmpl_idx = malloc(sizeof(unsigned long)
381                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
382         rrd_set_error("allocating tmpl_idx ...");
383         goto err_free_pdp_temp;
384     }
385     /* initialize tmplt redirector */
386     /* default config example (assume DS 1 is a CDEF DS)
387        tmpl_idx[0] -> 0; (time)
388        tmpl_idx[1] -> 1; (DS 0)
389        tmpl_idx[2] -> 3; (DS 2)
390        tmpl_idx[3] -> 4; (DS 3) */
391     tmpl_idx[0] = 0;    /* time */
392     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
393         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
394             tmpl_idx[ii++] = i;
395     }
396     tmpl_cnt = ii;
398     if (tmplt) {
399         /* we should work on a writeable copy here */
400         char     *dsname;
401         unsigned int tmpl_len;
402         char     *tmplt_copy = strdup(tmplt);
404         dsname = tmplt_copy;
405         tmpl_cnt = 1;   /* the first entry is the time */
406         tmpl_len = strlen(tmplt_copy);
407         for (i = 0; i <= tmpl_len; i++) {
408             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
409                 tmplt_copy[i] = '\0';
410                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
411                     rrd_set_error
412                         ("tmplt contains more DS definitions than RRD");
413                     goto err_free_tmpl_idx;
414                 }
415                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
416                     rrd_set_error("unknown DS name '%s'", dsname);
417                     goto err_free_tmpl_idx;
418                 } else {
419                     /* the first element is always the time */
420                     tmpl_idx[tmpl_cnt - 1]++;
421                     /* go to the next entry on the tmplt_copy */
422                     dsname = &tmplt_copy[i + 1];
423                     /* fix the damage we did before */
424                     if (i < tmpl_len) {
425                         tmplt_copy[i] = ':';
426                     }
428                 }
429             }
430         }
431         free(tmplt_copy);
432     }
433     if ((pdp_new = malloc(sizeof(rrd_value_t)
434                           * rrd.stat_head->ds_cnt)) == NULL) {
435         rrd_set_error("allocating pdp_new ...");
436         goto err_free_tmpl_idx;
437     }
438 #if 0                   //def HAVE_MMAP
439     rrd_mmaped_file = mmap(0,
440                            rrd_file->file_len,
441                            PROT_READ | PROT_WRITE,
442                            MAP_SHARED, fileno(in_file), 0);
443     if (rrd_mmaped_file == MAP_FAILED) {
444         rrd_set_error("error mmapping file %s", filename);
445         free(updvals);
446         free(pdp_temp);
447         free(tmpl_idx);
448         rrd_free(&rrd);
449         rrd_close(rrd_file);
450         return (-1);
451     }
452 #ifdef USE_MADVISE
453     /* when we use mmaping we tell the kernel the mmap equivalent
454        of POSIX_FADV_RANDOM */
455     madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
456 #endif
457 #endif
458     /* loop through the arguments. */
459     for (arg_i = 0; arg_i < argc; arg_i++) {
460         char     *stepper = strdup(argv[arg_i]);
461         char     *step_start = stepper;
462         char     *p;
463         char     *parsetime_error = NULL;
464         enum { atstyle, normal } timesyntax;
465         struct rrd_time_value ds_tv;
467         if (stepper == NULL) {
468             rrd_set_error("failed duplication argv entry");
469             free(step_start);
470             goto err_free_pdp_new;
471         }
472         /* initialize all ds input to unknown except the first one
473            which has always got to be set */
474         memset(updvals + 1, 'U', rrd.stat_head->ds_cnt);
475         updvals[0] = stepper;
476         /* separate all ds elements; first must be examined separately
477            due to alternate time syntax */
478         if ((p = strchr(stepper, '@')) != NULL) {
479             timesyntax = atstyle;
480             *p = '\0';
481             stepper = p + 1;
482         } else if ((p = strchr(stepper, ':')) != NULL) {
483             timesyntax = normal;
484             *p = '\0';
485             stepper = p + 1;
486         } else {
487             rrd_set_error
488                 ("expected timestamp not found in data source from %s",
489                  argv[arg_i]);
490             free(step_start);
491             break;
492         }
493         ii = 1;
494         updvals[tmpl_idx[ii]] = stepper;
495         while (*stepper) {
496             if (*stepper == ':') {
497                 *stepper = '\0';
498                 ii++;
499                 if (ii < tmpl_cnt) {
500                     updvals[tmpl_idx[ii]] = stepper + 1;
501                 }
502             }
503             stepper++;
504         }
506         if (ii != tmpl_cnt - 1) {
507             rrd_set_error
508                 ("expected %lu data source readings (got %lu) from %s",
509                  tmpl_cnt - 1, ii, argv[arg_i]);
510             free(step_start);
511             break;
512         }
514         /* get the time from the reading ... handle N */
515         if (timesyntax == atstyle) {
516             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
517                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
518                 free(step_start);
519                 break;
520             }
521             if (ds_tv.type == RELATIVE_TO_END_TIME ||
522                 ds_tv.type == RELATIVE_TO_START_TIME) {
523                 rrd_set_error("specifying time relative to the 'start' "
524                               "or 'end' makes no sense here: %s", updvals[0]);
525                 free(step_start);
526                 break;
527             }
529             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
531             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
533         } else if (strcmp(updvals[0], "N") == 0) {
534             gettimeofday(&tmp_time, 0);
535             normalize_time(&tmp_time);
536             current_time = tmp_time.tv_sec;
537             current_time_usec = tmp_time.tv_usec;
538         } else {
539             double    tmp;
541             tmp = strtod(updvals[0], 0);
542             current_time = floor(tmp);
543             current_time_usec =
544                 (long) ((tmp - (double) current_time) * 1000000.0);
545         }
546         /* dont do any correction for old version RRDs */
547         if (version < 3)
548             current_time_usec = 0;
550         if (current_time < rrd.live_head->last_up ||
551             (current_time == rrd.live_head->last_up &&
552              (long) current_time_usec <=
553              (long) rrd.live_head->last_up_usec)) {
554             rrd_set_error("illegal attempt to update using time %ld when "
555                           "last update time is %ld (minimum one second step)",
556                           current_time, rrd.live_head->last_up);
557             free(step_start);
558             break;
559         }
562         /* seek to the beginning of the rra's */
563         if (rra_current != rra_begin) {
564 #ifndef HAVE_MMAP
565             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
566                 rrd_set_error("seek error in rrd");
567                 free(step_start);
568                 break;
569             }
570 #endif
571             rra_current = rra_begin;
572         }
573         rra_start = rra_begin;
575         /* when was the current pdp started */
576         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
577         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
579         /* when did the last pdp_st occur */
580         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
581         occu_pdp_st = current_time - occu_pdp_age;
583         /* interval = current_time - rrd.live_head->last_up; */
584         interval = (double) (current_time - rrd.live_head->last_up)
585             + (double) ((long) current_time_usec -
586                         (long) rrd.live_head->last_up_usec) / 1000000.0;
588         if (occu_pdp_st > proc_pdp_st) {
589             /* OK we passed the pdp_st moment */
590             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
591                                                                      * occurred before the latest
592                                                                      * pdp_st moment*/
593             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
594             post_int = occu_pdp_age;    /* how much after it */
595             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
596         } else {
597             pre_int = interval;
598             post_int = 0;
599         }
601 #ifdef DEBUG
602         printf("proc_pdp_age %lu\t"
603                "proc_pdp_st %lu\t"
604                "occu_pfp_age %lu\t"
605                "occu_pdp_st %lu\t"
606                "int %lf\t"
607                "pre_int %lf\t"
608                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
609                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
610 #endif
612         /* process the data sources and update the pdp_prep 
613          * area accordingly */
614         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
615             enum dst_en dst_idx;
617             dst_idx = dst_conv(rrd.ds_def[i].dst);
619             /* make sure we do not build diffs with old last_ds values */
620             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
621                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
622                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
623             }
625             /* NOTE: DST_CDEF should never enter this if block, because
626              * updvals[i+1][0] is initialized to 'U'; unless the caller
627              * accidently specified a value for the DST_CDEF. To handle
628              * this case, an extra check is required. */
630             if ((updvals[i + 1][0] != 'U') &&
631                 (dst_idx != DST_CDEF) &&
632                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
633                 double    rate = DNAN;
635                 /* the data source type defines how to process the data */
636                 /* pdp_new contains rate * time ... eg the bytes
637                  * transferred during the interval. Doing it this way saves
638                  * a lot of math operations */
639                 switch (dst_idx) {
640                 case DST_COUNTER:
641                 case DST_DERIVE:
642                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
643                         for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
644                             if ((updvals[i + 1][ii] < '0'
645                                  || updvals[i + 1][ii] > '9') && (ii != 0
646                                                                   && updvals[i
647                                                                              +
648                                                                              1]
649                                                                   [ii] !=
650                                                                   '-')) {
651                                 rrd_set_error("not a simple integer: '%s'",
652                                               updvals[i + 1]);
653                                 break;
654                             }
655                         }
656                         if (rrd_test_error()) {
657                             break;
658                         }
659                         pdp_new[i] =
660                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
661                         if (dst_idx == DST_COUNTER) {
662                             /* simple overflow catcher suggested by Andres Kroonmaa */
663                             /* this will fail terribly for non 32 or 64 bit counters ... */
664                             /* are there any others in SNMP land ? */
665                             if (pdp_new[i] < (double) 0.0)
666                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
667                             if (pdp_new[i] < (double) 0.0)
668                                 pdp_new[i] += (double) 18446744069414584320.0;
669                             /* 2^64-2^32 */ ;
670                         }
671                         rate = pdp_new[i] / interval;
672                     } else {
673                         pdp_new[i] = DNAN;
674                     }
675                     break;
676                 case DST_ABSOLUTE:
677                     errno = 0;
678                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
679                     if (errno > 0) {
680                         rrd_set_error("converting '%s' to float: %s",
681                                       updvals[i + 1], rrd_strerror(errno));
682                         break;
683                     };
684                     if (endptr[0] != '\0') {
685                         rrd_set_error
686                             ("conversion of '%s' to float not complete: tail '%s'",
687                              updvals[i + 1], endptr);
688                         break;
689                     }
690                     rate = pdp_new[i] / interval;
691                     break;
692                 case DST_GAUGE:
693                     errno = 0;
694                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
695                     if (errno > 0) {
696                         rrd_set_error("converting '%s' to float: %s",
697                                       updvals[i + 1], rrd_strerror(errno));
698                         break;
699                     };
700                     if (endptr[0] != '\0') {
701                         rrd_set_error
702                             ("conversion of '%s' to float not complete: tail '%s'",
703                              updvals[i + 1], endptr);
704                         break;
705                     }
706                     rate = pdp_new[i] / interval;
707                     break;
708                 default:
709                     rrd_set_error("rrd contains unknown DS type : '%s'",
710                                   rrd.ds_def[i].dst);
711                     break;
712                 }
713                 /* break out of this for loop if the error string is set */
714                 if (rrd_test_error()) {
715                     break;
716                 }
717                 /* make sure pdp_temp is neither too large or too small
718                  * if any of these occur it becomes unknown ...
719                  * sorry folks ... */
720                 if (!isnan(rate) &&
721                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
722                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
723                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
724                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
725                     pdp_new[i] = DNAN;
726                 }
727             } else {
728                 /* no news is news all the same */
729                 pdp_new[i] = DNAN;
730             }
733             /* make a copy of the command line argument for the next run */
734 #ifdef DEBUG
735             fprintf(stderr,
736                     "prep ds[%lu]\t"
737                     "last_arg '%s'\t"
738                     "this_arg '%s'\t"
739                     "pdp_new %10.2f\n",
740                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
741 #endif
742             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
743             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
744         }
745         /* break out of the argument parsing loop if the error_string is set */
746         if (rrd_test_error()) {
747             free(step_start);
748             break;
749         }
750         /* has a pdp_st moment occurred since the last run ? */
752         if (proc_pdp_st == occu_pdp_st) {
753             /* no we have not passed a pdp_st moment. therefore update is simple */
755             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
756                 if (isnan(pdp_new[i])) {
757                     /* this is not realy accurate if we use subsecond data arival time
758                        should have thought of it when going subsecond resolution ...
759                        sorry next format change we will have it! */
760                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
761                         floor(interval);
762                 } else {
763                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
764                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
765                     } else {
766                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
767                     }
768                 }
769 #ifdef DEBUG
770                 fprintf(stderr,
771                         "NO PDP  ds[%lu]\t"
772                         "value %10.2f\t"
773                         "unkn_sec %5lu\n",
774                         i,
775                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
776                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
777 #endif
778             }
779         } else {
780             /* an pdp_st has occurred. */
782             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
783                rate*seconds which occurred up to the last run.
784                pdp_new[] contains rate*seconds from the latest run.
785                pdp_temp[] will contain the rate for cdp */
787             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
788                 /* update pdp_prep to the current pdp_st. */
789                 double    pre_unknown = 0.0;
791                 if (isnan(pdp_new[i])) {
792                     /* a final bit of unkonwn to be added bevore calculation
793                        we use a temporary variable for this so that we
794                        don't have to turn integer lines before using the value */
795                     pre_unknown = pre_int;
796                 } else {
797                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
798                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
799                             pdp_new[i] / interval * pre_int;
800                     } else {
801                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
802                             pdp_new[i] / interval * pre_int;
803                     }
804                 }
807                 /* if too much of the pdp_prep is unknown we dump it */
808                 if (
809                        /* removed because this does not agree with the
810                           definition that a heartbeat can be unknown */
811                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
812                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
813                        /* if the interval is larger thatn mrhb we get NAN */
814                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
815                        (occu_pdp_st - proc_pdp_st <=
816                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
817                     pdp_temp[i] = DNAN;
818                 } else {
819                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
820                         / ((double) (occu_pdp_st - proc_pdp_st
821                                      -
822                                      rrd.pdp_prep[i].
823                                      scratch[PDP_unkn_sec_cnt].u_cnt)
824                            - pre_unknown);
825                 }
827                 /* process CDEF data sources; remember each CDEF DS can
828                  * only reference other DS with a lower index number */
829                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
830                     rpnp_t   *rpnp;
832                     rpnp =
833                         rpn_expand((rpn_cdefds_t *) &
834                                    (rrd.ds_def[i].par[DS_cdef]));
835                     /* substitue data values for OP_VARIABLE nodes */
836                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
837                         if (rpnp[ii].op == OP_VARIABLE) {
838                             rpnp[ii].op = OP_NUMBER;
839                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
840                         }
841                     }
842                     /* run the rpn calculator */
843                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
844                         free(rpnp);
845                         break;  /* exits the data sources pdp_temp loop */
846                     }
847                 }
849                 /* make pdp_prep ready for the next run */
850                 if (isnan(pdp_new[i])) {
851                     /* this is not realy accurate if we use subsecond data arival time
852                        should have thought of it when going subsecond resolution ...
853                        sorry next format change we will have it! */
854                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
855                         floor(post_int);
856                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
857                 } else {
858                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
859                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
860                         pdp_new[i] / interval * post_int;
861                 }
863 #ifdef DEBUG
864                 fprintf(stderr,
865                         "PDP UPD ds[%lu]\t"
866                         "pdp_temp %10.2f\t"
867                         "new_prep %10.2f\t"
868                         "new_unkn_sec %5lu\n",
869                         i, pdp_temp[i],
870                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
871                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
872 #endif
873             }
875             /* if there were errors during the last loop, bail out here */
876             if (rrd_test_error()) {
877                 free(step_start);
878                 break;
879             }
881             /* compute the number of elapsed pdp_st moments */
882             elapsed_pdp_st =
883                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
884 #ifdef DEBUG
885             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
886 #endif
887             if (rra_step_cnt == NULL) {
888                 rra_step_cnt = (unsigned long *)
889                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
890             }
892             for (i = 0, rra_start = rra_begin;
893                  i < rrd.stat_head->rra_cnt;
894                  rra_start +=
895                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
896                  sizeof(rrd_value_t), i++) {
897                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
898                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
899                     (proc_pdp_st / rrd.stat_head->pdp_step) %
900                     rrd.rra_def[i].pdp_cnt;
901                 if (start_pdp_offset <= elapsed_pdp_st) {
902                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
903                         rrd.rra_def[i].pdp_cnt + 1;
904                 } else {
905                     rra_step_cnt[i] = 0;
906                 }
908                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
909                     /* If this is a bulk update, we need to skip ahead in
910                        the seasonal arrays so that they will be correct for
911                        the next observed value;
912                        note that for the bulk update itself, no update will
913                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
914                        and DEVPREDICT will be set to DNAN. */
915                     if (rra_step_cnt[i] > 2) {
916                         /* skip update by resetting rra_step_cnt[i],
917                            note that this is not data source specific; this is
918                            due to the bulk update, not a DNAN value for the
919                            specific data source. */
920                         rra_step_cnt[i] = 0;
921                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
922                                         elapsed_pdp_st, &last_seasonal_coef);
923                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
924                                         elapsed_pdp_st + 1, &seasonal_coef);
925                     }
927                     /* periodically run a smoother for seasonal effects */
928                     /* Need to use first cdp parameter buffer to track
929                      * burnin (burnin requires a specific smoothing schedule).
930                      * The CDP_init_seasonal parameter is really an RRA level,
931                      * not a data source within RRA level parameter, but the rra_def
932                      * is read only for rrd_update (not flushed to disk). */
933                     iii = i * (rrd.stat_head->ds_cnt);
934                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
935                         <= BURNIN_CYCLES) {
936                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
937                             > rrd.rra_def[i].row_cnt - 1) {
938                             /* mark off one of the burnin cycles */
939                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
940                                u_cnt);
941                             schedule_smooth = 1;
942                         }
943                     } else {
944                         /* someone has no doubt invented a trick to deal with this
945                          * wrap around, but at least this code is clear. */
946                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
947                             u_cnt > rrd.rra_ptr[i].cur_row) {
948                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
949                              * mapping between PDP and CDP */
950                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
951                                 >=
952                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
953                                 u_cnt) {
954 #ifdef DEBUG
955                                 fprintf(stderr,
956                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
957                                         rrd.rra_ptr[i].cur_row,
958                                         elapsed_pdp_st,
959                                         rrd.rra_def[i].
960                                         par[RRA_seasonal_smooth_idx].u_cnt);
961 #endif
962                                 schedule_smooth = 1;
963                             }
964                         } else {
965                             /* can't rely on negative numbers because we are working with
966                              * unsigned values */
967                             /* Don't need modulus here. If we've wrapped more than once, only
968                              * one smooth is executed at the end. */
969                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
970                                 rrd.rra_def[i].row_cnt
971                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
972                                 rrd.rra_def[i].row_cnt >=
973                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
974                                 u_cnt) {
975 #ifdef DEBUG
976                                 fprintf(stderr,
977                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
978                                         rrd.rra_ptr[i].cur_row,
979                                         elapsed_pdp_st,
980                                         rrd.rra_def[i].
981                                         par[RRA_seasonal_smooth_idx].u_cnt);
982 #endif
983                                 schedule_smooth = 1;
984                             }
985                         }
986                     }
988                     rra_current = rrd_tell(rrd_file);
989                 }
990                 /* if cf is DEVSEASONAL or SEASONAL */
991                 if (rrd_test_error())
992                     break;
994                 /* update CDP_PREP areas */
995                 /* loop over data soures within each RRA */
996                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
998                     /* iii indexes the CDP prep area for this data source within the RRA */
999                     iii = i * rrd.stat_head->ds_cnt + ii;
1001                     if (rrd.rra_def[i].pdp_cnt > 1) {
1003                         if (rra_step_cnt[i] > 0) {
1004                             /* If we are in this block, as least 1 CDP value will be written to
1005                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
1006                              * to be written, then the "fill in" value is the CDP_secondary_val
1007                              * entry. */
1008                             if (isnan(pdp_temp[ii])) {
1009                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1010                                     u_cnt += start_pdp_offset;
1011                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1012                                     u_val = DNAN;
1013                             } else {
1014                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
1015                                  * CDP data entries. No matter the CF, the value is the same because
1016                                  * the average, max, min, and last of a list of identical values is
1017                                  * the same, namely, the value itself. */
1018                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1019                                     u_val = pdp_temp[ii];
1020                             }
1022                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1023                                 u_cnt >
1024                                 rrd.rra_def[i].pdp_cnt *
1025                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1026                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1027                                     u_val = DNAN;
1028                                 /* initialize carry over */
1029                                 if (current_cf == CF_AVERAGE) {
1030                                     if (isnan(pdp_temp[ii])) {
1031                                         rrd.cdp_prep[iii].scratch[CDP_val].
1032                                             u_val = DNAN;
1033                                     } else {
1034                                         rrd.cdp_prep[iii].scratch[CDP_val].
1035                                             u_val =
1036                                             pdp_temp[ii] *
1037                                             ((elapsed_pdp_st -
1038                                               start_pdp_offset) %
1039                                              rrd.rra_def[i].pdp_cnt);
1040                                     }
1041                                 } else {
1042                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1043                                         pdp_temp[ii];
1044                                 }
1045                             } else {
1046                                 rrd_value_t cum_val, cur_val;
1048                                 switch (current_cf) {
1049                                 case CF_AVERAGE:
1050                                     cum_val =
1051                                         IFDNAN(rrd.cdp_prep[iii].
1052                                                scratch[CDP_val].u_val, 0.0);
1053                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1054                                     rrd.cdp_prep[iii].
1055                                         scratch[CDP_primary_val].u_val =
1056                                         (cum_val +
1057                                          cur_val * start_pdp_offset) /
1058                                         (rrd.rra_def[i].pdp_cnt -
1059                                          rrd.cdp_prep[iii].
1060                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1061                                     /* initialize carry over value */
1062                                     if (isnan(pdp_temp[ii])) {
1063                                         rrd.cdp_prep[iii].scratch[CDP_val].
1064                                             u_val = DNAN;
1065                                     } else {
1066                                         rrd.cdp_prep[iii].scratch[CDP_val].
1067                                             u_val =
1068                                             pdp_temp[ii] *
1069                                             ((elapsed_pdp_st -
1070                                               start_pdp_offset) %
1071                                              rrd.rra_def[i].pdp_cnt);
1072                                     }
1073                                     break;
1074                                 case CF_MAXIMUM:
1075                                     cum_val =
1076                                         IFDNAN(rrd.cdp_prep[iii].
1077                                                scratch[CDP_val].u_val, -DINF);
1078                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1079 #ifdef DEBUG
1080                                     if (isnan
1081                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1082                                          u_val) && isnan(pdp_temp[ii])) {
1083                                         fprintf(stderr,
1084                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1085                                                 i, ii);
1086                                         exit(-1);
1087                                     }
1088 #endif
1089                                     if (cur_val > cum_val)
1090                                         rrd.cdp_prep[iii].
1091                                             scratch[CDP_primary_val].u_val =
1092                                             cur_val;
1093                                     else
1094                                         rrd.cdp_prep[iii].
1095                                             scratch[CDP_primary_val].u_val =
1096                                             cum_val;
1097                                     /* initialize carry over value */
1098                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1099                                         pdp_temp[ii];
1100                                     break;
1101                                 case CF_MINIMUM:
1102                                     cum_val =
1103                                         IFDNAN(rrd.cdp_prep[iii].
1104                                                scratch[CDP_val].u_val, DINF);
1105                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1106 #ifdef DEBUG
1107                                     if (isnan
1108                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1109                                          u_val) && isnan(pdp_temp[ii])) {
1110                                         fprintf(stderr,
1111                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1112                                                 i, ii);
1113                                         exit(-1);
1114                                     }
1115 #endif
1116                                     if (cur_val < cum_val)
1117                                         rrd.cdp_prep[iii].
1118                                             scratch[CDP_primary_val].u_val =
1119                                             cur_val;
1120                                     else
1121                                         rrd.cdp_prep[iii].
1122                                             scratch[CDP_primary_val].u_val =
1123                                             cum_val;
1124                                     /* initialize carry over value */
1125                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1126                                         pdp_temp[ii];
1127                                     break;
1128                                 case CF_LAST:
1129                                 default:
1130                                     rrd.cdp_prep[iii].
1131                                         scratch[CDP_primary_val].u_val =
1132                                         pdp_temp[ii];
1133                                     /* initialize carry over value */
1134                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1135                                         pdp_temp[ii];
1136                                     break;
1137                                 }
1138                             }   /* endif meets xff value requirement for a valid value */
1139                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1140                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1141                             if (isnan(pdp_temp[ii]))
1142                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1143                                     u_cnt =
1144                                     (elapsed_pdp_st -
1145                                      start_pdp_offset) %
1146                                     rrd.rra_def[i].pdp_cnt;
1147                             else
1148                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1149                                     u_cnt = 0;
1150                         } else {    /* rra_step_cnt[i]  == 0 */
1152 #ifdef DEBUG
1153                             if (isnan
1154                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1155                                 fprintf(stderr,
1156                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1157                                         i, ii);
1158                             } else {
1159                                 fprintf(stderr,
1160                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1161                                         i, ii,
1162                                         rrd.cdp_prep[iii].scratch[CDP_val].
1163                                         u_val);
1164                             }
1165 #endif
1166                             if (isnan(pdp_temp[ii])) {
1167                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1168                                     u_cnt += elapsed_pdp_st;
1169                             } else
1170                                 if (isnan
1171                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1172                                      u_val)) {
1173                                 if (current_cf == CF_AVERAGE) {
1174                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1175                                         pdp_temp[ii] * elapsed_pdp_st;
1176                                 } else {
1177                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1178                                         pdp_temp[ii];
1179                                 }
1180 #ifdef DEBUG
1181                                 fprintf(stderr,
1182                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1183                                         i, ii,
1184                                         rrd.cdp_prep[iii].scratch[CDP_val].
1185                                         u_val);
1186 #endif
1187                             } else {
1188                                 switch (current_cf) {
1189                                 case CF_AVERAGE:
1190                                     rrd.cdp_prep[iii].scratch[CDP_val].
1191                                         u_val +=
1192                                         pdp_temp[ii] * elapsed_pdp_st;
1193                                     break;
1194                                 case CF_MINIMUM:
1195                                     if (pdp_temp[ii] <
1196                                         rrd.cdp_prep[iii].scratch[CDP_val].
1197                                         u_val)
1198                                         rrd.cdp_prep[iii].scratch[CDP_val].
1199                                             u_val = pdp_temp[ii];
1200                                     break;
1201                                 case CF_MAXIMUM:
1202                                     if (pdp_temp[ii] >
1203                                         rrd.cdp_prep[iii].scratch[CDP_val].
1204                                         u_val)
1205                                         rrd.cdp_prep[iii].scratch[CDP_val].
1206                                             u_val = pdp_temp[ii];
1207                                     break;
1208                                 case CF_LAST:
1209                                 default:
1210                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1211                                         pdp_temp[ii];
1212                                     break;
1213                                 }
1214                             }
1215                         }
1216                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1217                         if (elapsed_pdp_st > 2) {
1218                             switch (current_cf) {
1219                             case CF_AVERAGE:
1220                             default:
1221                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1222                                     u_val = pdp_temp[ii];
1223                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1224                                     u_val = pdp_temp[ii];
1225                                 break;
1226                             case CF_SEASONAL:
1227                             case CF_DEVSEASONAL:
1228                                 /* need to update cached seasonal values, so they are consistent
1229                                  * with the bulk update */
1230                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1231                                  * CDP_last_deviation are the same. */
1232                                 rrd.cdp_prep[iii].
1233                                     scratch[CDP_hw_last_seasonal].u_val =
1234                                     last_seasonal_coef[ii];
1235                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1236                                     u_val = seasonal_coef[ii];
1237                                 break;
1238                             case CF_HWPREDICT:
1239                                 /* need to update the null_count and last_null_count.
1240                                  * even do this for non-DNAN pdp_temp because the
1241                                  * algorithm is not learning from batch updates. */
1242                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1243                                     u_cnt += elapsed_pdp_st;
1244                                 rrd.cdp_prep[iii].
1245                                     scratch[CDP_last_null_count].u_cnt +=
1246                                     elapsed_pdp_st - 1;
1247                                 /* fall through */
1248                             case CF_DEVPREDICT:
1249                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1250                                     u_val = DNAN;
1251                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1252                                     u_val = DNAN;
1253                                 break;
1254                             case CF_FAILURES:
1255                                 /* do not count missed bulk values as failures */
1256                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1257                                     u_val = 0;
1258                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1259                                     u_val = 0;
1260                                 /* need to reset violations buffer.
1261                                  * could do this more carefully, but for now, just
1262                                  * assume a bulk update wipes away all violations. */
1263                                 erase_violations(&rrd, iii, i);
1264                                 break;
1265                             }
1266                         }
1267                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1269                     if (rrd_test_error())
1270                         break;
1272                 }       /* endif data sources loop */
1273             }           /* end RRA Loop */
1275             /* this loop is only entered if elapsed_pdp_st < 3 */
1276             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1277                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1278                 for (i = 0, rra_start = rra_begin;
1279                      i < rrd.stat_head->rra_cnt;
1280                      rra_start +=
1281                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1282                      sizeof(rrd_value_t), i++) {
1283                     if (rrd.rra_def[i].pdp_cnt > 1)
1284                         continue;
1286                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1287                     if (current_cf == CF_SEASONAL
1288                         || current_cf == CF_DEVSEASONAL) {
1289                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1290                                         elapsed_pdp_st + (scratch_idx ==
1291                                                           CDP_primary_val ? 1
1292                                                           : 2),
1293                                         &seasonal_coef);
1294                         rra_current = rrd_tell(rrd_file);
1295                     }
1296                     if (rrd_test_error())
1297                         break;
1298                     /* loop over data soures within each RRA */
1299                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1300                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1301                                            i * (rrd.stat_head->ds_cnt) + ii,
1302                                            i, ii, scratch_idx, seasonal_coef);
1303                     }
1304                 }       /* end RRA Loop */
1305                 if (rrd_test_error())
1306                     break;
1307             }           /* end elapsed_pdp_st loop */
1309             if (rrd_test_error())
1310                 break;
1312             /* Ready to write to disk */
1313             /* Move sequentially through the file, writing one RRA at a time.
1314              * Note this architecture divorces the computation of CDP with
1315              * flushing updated RRA entries to disk. */
1316             for (i = 0, rra_start = rra_begin;
1317                  i < rrd.stat_head->rra_cnt;
1318                  rra_start +=
1319                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1320                  sizeof(rrd_value_t), i++) {
1321                 /* is th5Aere anything to write for this RRA? If not, continue. */
1322                 if (rra_step_cnt[i] == 0)
1323                     continue;
1325                 /* write the first row */
1326 #ifdef DEBUG
1327                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1328 #endif
1329                 rrd.rra_ptr[i].cur_row++;
1330                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1331                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1332                 /* positition on the first row */
1333                 rra_pos_tmp = rra_start +
1334                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1335                     sizeof(rrd_value_t);
1336                 if (rra_pos_tmp != rra_current) {
1337 #ifndef HAVE_MMAP
1338                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1339                         rrd_set_error("seek error in rrd");
1340                         break;
1341                     }
1342 #endif
1343                     rra_current = rra_pos_tmp;
1344                 }
1345 #ifdef DEBUG
1346                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1347 #endif
1348                 scratch_idx = CDP_primary_val;
1349                 if (pcdp_summary != NULL) {
1350                     rra_time = (current_time - current_time
1351                                 % (rrd.rra_def[i].pdp_cnt *
1352                                    rrd.stat_head->pdp_step))
1353                         -
1354                         ((rra_step_cnt[i] -
1355                           1) * rrd.rra_def[i].pdp_cnt *
1356                          rrd.stat_head->pdp_step);
1357                 }
1358                 pcdp_summary =
1359                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1360                                   scratch_idx, pcdp_summary, &rra_time);
1361                 if (rrd_test_error())
1362                     break;
1364                 /* write other rows of the bulk update, if any */
1365                 scratch_idx = CDP_secondary_val;
1366                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1367                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1368 #ifdef DEBUG
1369                         fprintf(stderr,
1370                                 "Wraparound for RRA %s, %lu updates left\n",
1371                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1372 #endif
1373                         /* wrap */
1374                         rrd.rra_ptr[i].cur_row = 0;
1375                         /* seek back to beginning of current rra */
1376                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1377                             rrd_set_error("seek error in rrd");
1378                             break;
1379                         }
1380 #ifdef DEBUG
1381                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1382                                 rrd_file->pos);
1383 #endif
1384                         rra_current = rra_start;
1385                     }
1386                     if (pcdp_summary != NULL) {
1387                         rra_time = (current_time - current_time
1388                                     % (rrd.rra_def[i].pdp_cnt *
1389                                        rrd.stat_head->pdp_step))
1390                             -
1391                             ((rra_step_cnt[i] -
1392                               2) * rrd.rra_def[i].pdp_cnt *
1393                              rrd.stat_head->pdp_step);
1394                     }
1395                     pcdp_summary =
1396                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1397                                       scratch_idx, pcdp_summary, &rra_time);
1398                 }
1400                 if (rrd_test_error())
1401                     break;
1402             }           /* RRA LOOP */
1404             /* break out of the argument parsing loop if error_string is set */
1405             if (rrd_test_error()) {
1406                 free(step_start);
1407                 break;
1408             }
1410         }               /* endif a pdp_st has occurred */
1411         rrd.live_head->last_up = current_time;
1412         rrd.live_head->last_up_usec = current_time_usec;
1413         free(step_start);
1414     }                   /* function argument loop */
1416     if (seasonal_coef != NULL)
1417         free(seasonal_coef);
1418     if (last_seasonal_coef != NULL)
1419         free(last_seasonal_coef);
1420     if (rra_step_cnt != NULL)
1421         free(rra_step_cnt);
1422     rpnstack_free(&rpnstack);
1424 #if 0                   //def HAVE_MMAP
1425     if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1426         rrd_set_error("error writing(unmapping) file: %s", filename);
1427     }
1428 #else
1429     //rrd_flush(rrd_file);    //XXX: really needed?
1430 #endif
1431     /* if we got here and if there is an error and if the file has not been
1432      * written to, then close things up and return. */
1433     if (rrd_test_error()) {
1434         goto err_free_pdp_new;
1435     }
1437     /* aargh ... that was tough ... so many loops ... anyway, its done.
1438      * we just need to write back the live header portion now*/
1440     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1441                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1442                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1443                  SEEK_SET) != 0) {
1444         rrd_set_error("seek rrd for live header writeback");
1445         goto err_free_pdp_new;
1446     }
1448     if (version >= 3) {
1449         if (rrd_write(rrd_file, rrd.live_head,
1450                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1451             rrd_set_error("rrd_write live_head to rrd");
1452             goto err_free_pdp_new;
1453         }
1454     } else {
1455         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1456                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1457             rrd_set_error("rrd_write live_head to rrd");
1458             goto err_free_pdp_new;
1459         }
1460     }
1463     if (rrd_write(rrd_file, rrd.pdp_prep,
1464                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1465         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1466         rrd_set_error("rrd_write pdp_prep to rrd");
1467         goto err_free_pdp_new;
1468     }
1470     if (rrd_write(rrd_file, rrd.cdp_prep,
1471                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1472                   rrd.stat_head->ds_cnt)
1473         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1474                       rrd.stat_head->ds_cnt)) {
1476         rrd_set_error("rrd_write cdp_prep to rrd");
1477         goto err_free_pdp_new;
1478     }
1480     if (rrd_write(rrd_file, rrd.rra_ptr,
1481                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1482         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1483         rrd_set_error("rrd_write rra_ptr to rrd");
1484         goto err_free_pdp_new;
1485     }
1486 #ifdef HAVE_POSIX_FADVISExxx
1488     /* with update we have write ops, so they will probably not be done by now, this means
1489        the buffers will not get freed. But calling this for the whole file - header
1490        will let the data off the hook as soon as it is written when if it is from a previous
1491        update cycle. Calling fdsync to force things is much too hard here. */
1493     if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1494         rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1495                       rrd_strerror(errno));
1496         goto err_free_pdp_new;
1497     }
1498 #endif
1499     /* rrd_flush(rrd_file); */
1501     /* calling the smoothing code here guarantees at most
1502      * one smoothing operation per rrd_update call. Unfortunately,
1503      * it is possible with bulk updates, or a long-delayed update
1504      * for smoothing to occur off-schedule. This really isn't
1505      * critical except during the burning cycles. */
1506     if (schedule_smooth) {
1508         rra_start = rra_begin;
1509         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1510             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1511                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1512 #ifdef DEBUG
1513                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1514 #endif
1515                 apply_smoother(&rrd, i, rra_start, rrd_file);
1516                 if (rrd_test_error())
1517                     break;
1518             }
1519             rra_start += rrd.rra_def[i].row_cnt
1520                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1521         }
1522 #ifdef HAVE_POSIX_FADVISExxx
1523         /* same procedure as above ... */
1524         if (0 !=
1525             posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1526             rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1527                           rrd_strerror(errno));
1528             goto err_free_pdp_new;
1529         }
1530 #endif
1531     }
1533     rrd_free(&rrd);
1534     rrd_close(rrd_file);
1536     free(pdp_new);
1537     free(tmpl_idx);
1538     free(pdp_temp);
1539     free(updvals);
1540     return (0);
1542   err_free_pdp_new:
1543     free(pdp_new);
1544   err_free_tmpl_idx:
1545     free(tmpl_idx);
1546   err_free_pdp_temp:
1547     free(pdp_temp);
1548   err_free_updvals:
1549     free(updvals);
1550   err_close:
1551     rrd_close(rrd_file);
1552   err_free:
1553     rrd_free(&rrd);
1554   err_out:
1555     return (-1);
1558 /*
1559  * get exclusive lock to whole file.
1560  * lock gets removed when we close the file
1561  *
1562  * returns 0 on success
1563  */
1564 int LockRRD(
1565     int in_file)
1567     int       rcstat;
1569     {
1570 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1571         struct _stat st;
1573         if (_fstat(in_file, &st) == 0) {
1574             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1575         } else {
1576             rcstat = -1;
1577         }
1578 #else
1579         struct flock lock;
1581         lock.l_type = F_WRLCK;  /* exclusive write lock */
1582         lock.l_len = 0; /* whole file */
1583         lock.l_start = 0;   /* start of file */
1584         lock.l_whence = SEEK_SET;   /* end of file */
1586         rcstat = fcntl(in_file, F_SETLK, &lock);
1587 #endif
1588     }
1590     return (rcstat);