Code

make sure we check input even when the previous update was a 'U' ... and some indenti...
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.23  Copyright by Tobi Oetiker, 1997-2007
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34     time_t    tv_sec;   /* seconds */
35     long      tv_usec;  /* microseconds */
36 };
37 #endif
39 struct __timezone {
40     int       tz_minuteswest;   /* minutes W of Greenwich */
41     int       tz_dsttime;   /* type of dst correction */
42 };
44 static int gettimeofday(
45     struct timeval *t,
46     struct __timezone *tz)
47 {
49     struct _timeb current_time;
51     _ftime(&current_time);
53     t->tv_sec = current_time.time;
54     t->tv_usec = current_time.millitm * 1000;
56     return 0;
57 }
59 #endif
60 /*
61  * normalize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static inline void normalize_time(
65     struct timeval *t)
66 {
67     if (t->tv_usec < 0) {
68         t->tv_sec--;
69         t->tv_usec += 1000000L;
70     }
71 }
73 static inline info_t *write_RRA_row(
74     rrd_file_t *rrd_file,
75     rrd_t *rrd,
76     unsigned long rra_idx,
77     unsigned long *rra_current,
78     unsigned short CDP_scratch_idx,
79     info_t *pcdp_summary,
80     time_t *rra_time)
81 {
82     unsigned long ds_idx, cdp_idx;
83     infoval   iv;
85     for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86         /* compute the cdp index */
87         cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
88 #ifdef DEBUG
89         fprintf(stderr, "  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90                 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91                 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
92 #endif
93         if (pcdp_summary != NULL) {
94             iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95             /* append info to the return hash */
96             pcdp_summary = info_push(pcdp_summary,
97                                      sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
98                                                    *rra_time,
99                                                    rrd->rra_def[rra_idx].
100                                                    cf_nam,
101                                                    rrd->rra_def[rra_idx].
102                                                    pdp_cnt,
103                                                    rrd->ds_def[ds_idx].
104                                                    ds_nam), RD_I_VAL, iv);
105         }
106         if (rrd_write
107             (rrd_file,
108              &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109              sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110             rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111             return 0;
112         }
113         *rra_current += sizeof(rrd_value_t);
114     }
115     return (pcdp_summary);
118 int       rrd_update_r(
119     const char *filename,
120     const char *tmplt,
121     int argc,
122     const char **argv);
123 int       _rrd_update(
124     const char *filename,
125     const char *tmplt,
126     int argc,
127     const char **argv,
128     info_t *);
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
133 info_t   *rrd_update_v(
134     int argc,
135     char **argv)
137     char     *tmplt = NULL;
138     info_t   *result = NULL;
139     infoval   rc;
140     struct option long_options[] = {
141         {"template", required_argument, 0, 't'},
142         {0, 0, 0, 0}
143     };
145     rc.u_int = -1;
146     optind = 0;
147     opterr = 0;         /* initialize getopt */
149     while (1) {
150         int       option_index = 0;
151         int       opt;
153         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
155         if (opt == EOF)
156             break;
158         switch (opt) {
159         case 't':
160             tmplt = optarg;
161             break;
163         case '?':
164             rrd_set_error("unknown option '%s'", argv[optind - 1]);
165             goto end_tag;
166         }
167     }
169     /* need at least 2 arguments: filename, data. */
170     if (argc - optind < 2) {
171         rrd_set_error("Not enough arguments");
172         goto end_tag;
173     }
174     rc.u_int = 0;
175     result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176     rc.u_int = _rrd_update(argv[optind], tmplt,
177                            argc - optind - 1,
178                            (const char **) (argv + optind + 1), result);
179     result->value.u_int = rc.u_int;
180   end_tag:
181     return result;
184 int rrd_update(
185     int argc,
186     char **argv)
188     struct option long_options[] = {
189         {"template", required_argument, 0, 't'},
190         {0, 0, 0, 0}
191     };
192     int       option_index = 0;
193     int       opt;
194     char     *tmplt = NULL;
195     int       rc;
197     optind = 0;
198     opterr = 0;         /* initialize getopt */
200     while (1) {
201         opt = getopt_long(argc, argv, "t:", long_options, &option_index);
203         if (opt == EOF)
204             break;
206         switch (opt) {
207         case 't':
208             tmplt = strdup(optarg);
209             break;
211         case '?':
212             rrd_set_error("unknown option '%s'", argv[optind - 1]);
213             return (-1);
214         }
215     }
217     /* need at least 2 arguments: filename, data. */
218     if (argc - optind < 2) {
219         rrd_set_error("Not enough arguments");
221         return -1;
222     }
224     rc = rrd_update_r(argv[optind], tmplt,
225                       argc - optind - 1, (const char **) (argv + optind + 1));
226     free(tmplt);
227     return rc;
230 int rrd_update_r(
231     const char *filename,
232     const char *tmplt,
233     int argc,
234     const char **argv)
236     return _rrd_update(filename, tmplt, argc, argv, NULL);
239 int _rrd_update(
240     const char *filename,
241     const char *tmplt,
242     int argc,
243     const char **argv,
244     info_t *pcdp_summary)
247     int       arg_i = 2;
248     short     j;
249     unsigned long i, ii, iii = 1;
251     unsigned long rra_begin;    /* byte pointer to the rra
252                                  * area in the rrd file.  this
253                                  * pointer never changes value */
254     unsigned long rra_start;    /* byte pointer to the rra
255                                  * area in the rrd file.  this
256                                  * pointer changes as each rrd is
257                                  * processed. */
258     unsigned long rra_current;  /* byte pointer to the current write
259                                  * spot in the rrd file. */
260     unsigned long rra_pos_tmp;  /* temporary byte pointer. */
261     double    interval, pre_int, post_int;  /* interval between this and
262                                              * the last run */
263     unsigned long proc_pdp_st;  /* which pdp_st was the last
264                                  * to be processed */
265     unsigned long occu_pdp_st;  /* when was the pdp_st
266                                  * before the last update
267                                  * time */
268     unsigned long proc_pdp_age; /* how old was the data in
269                                  * the pdp prep area when it
270                                  * was last updated */
271     unsigned long occu_pdp_age; /* how long ago was the last
272                                  * pdp_step time */
273     rrd_value_t *pdp_new;   /* prepare the incoming data
274                              * to be added the the
275                              * existing entry */
276     rrd_value_t *pdp_temp;  /* prepare the pdp values
277                              * to be added the the
278                              * cdp values */
280     long     *tmpl_idx; /* index representing the settings
281                            transported by the tmplt index */
282     unsigned long tmpl_cnt = 2; /* time and data */
284     rrd_t     rrd;
285     time_t    current_time = 0;
286     time_t    rra_time = 0; /* time of update for a RRA */
287     unsigned long current_time_usec = 0;    /* microseconds part of current time */
288     struct timeval tmp_time;    /* used for time conversion */
290     char    **updvals;
291     int       schedule_smooth = 0;
292     rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
294     /* a vector of future Holt-Winters seasonal coefs */
295     unsigned long elapsed_pdp_st;
297     /* number of elapsed PDP steps since last update */
298     unsigned long *rra_step_cnt = NULL;
300     /* number of rows to be updated in an RRA for a data
301      * value. */
302     unsigned long start_pdp_offset;
304     /* number of PDP steps since the last update that
305      * are assigned to the first CDP to be generated
306      * since the last update. */
307     unsigned short scratch_idx;
309     /* index into the CDP scratch array */
310     enum cf_en current_cf;
312     /* numeric id of the current consolidation function */
313     rpnstack_t rpnstack;    /* used for COMPUTE DS */
314     int       version;  /* rrd version */
315     char     *endptr;   /* used in the conversion */
316     rrd_file_t *rrd_file;
318     rpnstack_init(&rpnstack);
320     /* need at least 1 arguments: data. */
321     if (argc < 1) {
322         rrd_set_error("Not enough arguments");
323         goto err_out;
324     }
326     rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327     if (rrd_file == NULL) {
328         goto err_free;
329     }
330     /* We are now at the beginning of the rra's */
331     rra_current = rra_start = rra_begin = rrd_file->header_len;
333     /* initialize time */
334     version = atoi(rrd.stat_head->version);
335     gettimeofday(&tmp_time, 0);
336     normalize_time(&tmp_time);
337     current_time = tmp_time.tv_sec;
338     if (version >= 3) {
339         current_time_usec = tmp_time.tv_usec;
340     } else {
341         current_time_usec = 0;
342     }
344     /* get exclusive lock to whole file.
345      * lock gets removed when we close the file.
346      */
347     if (LockRRD(rrd_file->fd) != 0) {
348         rrd_set_error("could not lock RRD");
349         goto err_close;
350     }
352     if ((updvals =
353          malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354         rrd_set_error("allocating updvals pointer array");
355         goto err_close;
356     }
358     if ((pdp_temp = malloc(sizeof(rrd_value_t)
359                            * rrd.stat_head->ds_cnt)) == NULL) {
360         rrd_set_error("allocating pdp_temp ...");
361         goto err_free_updvals;
362     }
364     if ((tmpl_idx = malloc(sizeof(unsigned long)
365                            * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366         rrd_set_error("allocating tmpl_idx ...");
367         goto err_free_pdp_temp;
368     }
369     /* initialize tmplt redirector */
370     /* default config example (assume DS 1 is a CDEF DS)
371        tmpl_idx[0] -> 0; (time)
372        tmpl_idx[1] -> 1; (DS 0)
373        tmpl_idx[2] -> 3; (DS 2)
374        tmpl_idx[3] -> 4; (DS 3) */
375     tmpl_idx[0] = 0;    /* time */
376     for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377         if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
378             tmpl_idx[ii++] = i;
379     }
380     tmpl_cnt = ii;
382     if (tmplt) {
383         /* we should work on a writeable copy here */
384         char     *dsname;
385         unsigned int tmpl_len;
386         char     *tmplt_copy = strdup(tmplt);
388         dsname = tmplt_copy;
389         tmpl_cnt = 1;   /* the first entry is the time */
390         tmpl_len = strlen(tmplt_copy);
391         for (i = 0; i <= tmpl_len; i++) {
392             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393                 tmplt_copy[i] = '\0';
394                 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
395                     rrd_set_error
396                         ("tmplt contains more DS definitions than RRD");
397                     goto err_free_tmpl_idx;
398                 }
399                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400                     rrd_set_error("unknown DS name '%s'", dsname);
401                     goto err_free_tmpl_idx;
402                 } else {
403                     /* the first element is always the time */
404                     tmpl_idx[tmpl_cnt - 1]++;
405                     /* go to the next entry on the tmplt_copy */
406                     dsname = &tmplt_copy[i + 1];
407                     /* fix the damage we did before */
408                     if (i < tmpl_len) {
409                         tmplt_copy[i] = ':';
410                     }
412                 }
413             }
414         }
415         free(tmplt_copy);
416     }
417     if ((pdp_new = malloc(sizeof(rrd_value_t)
418                           * rrd.stat_head->ds_cnt)) == NULL) {
419         rrd_set_error("allocating pdp_new ...");
420         goto err_free_tmpl_idx;
421     }
422     /* loop through the arguments. */
423     for (arg_i = 0; arg_i < argc; arg_i++) {
424         char     *stepper = strdup(argv[arg_i]);
425         char     *step_start = stepper;
426         char     *p;
427         char     *parsetime_error = NULL;
428         enum { atstyle, normal } timesyntax;
429         struct rrd_time_value ds_tv;
431         if (stepper == NULL) {
432             rrd_set_error("failed duplication argv entry");
433             free(step_start);
434             goto err_free_pdp_new;
435         }
436         /* initialize all ds input to unknown except the first one
437            which has always got to be set */
438         for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
439             updvals[ii] = "U";
440         updvals[0] = stepper;
441         /* separate all ds elements; first must be examined separately
442            due to alternate time syntax */
443         if ((p = strchr(stepper, '@')) != NULL) {
444             timesyntax = atstyle;
445             *p = '\0';
446             stepper = p + 1;
447         } else if ((p = strchr(stepper, ':')) != NULL) {
448             timesyntax = normal;
449             *p = '\0';
450             stepper = p + 1;
451         } else {
452             rrd_set_error
453                 ("expected timestamp not found in data source from %s",
454                  argv[arg_i]);
455             free(step_start);
456             break;
457         }
458         ii = 1;
459         updvals[tmpl_idx[ii]] = stepper;
460         while (*stepper) {
461             if (*stepper == ':') {
462                 *stepper = '\0';
463                 ii++;
464                 if (ii < tmpl_cnt) {
465                     updvals[tmpl_idx[ii]] = stepper + 1;
466                 }
467             }
468             stepper++;
469         }
471         if (ii != tmpl_cnt - 1) {
472             rrd_set_error
473                 ("expected %lu data source readings (got %lu) from %s",
474                  tmpl_cnt - 1, ii, argv[arg_i]);
475             free(step_start);
476             break;
477         }
479         /* get the time from the reading ... handle N */
480         if (timesyntax == atstyle) {
481             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
483                 free(step_start);
484                 break;
485             }
486             if (ds_tv.type == RELATIVE_TO_END_TIME ||
487                 ds_tv.type == RELATIVE_TO_START_TIME) {
488                 rrd_set_error("specifying time relative to the 'start' "
489                               "or 'end' makes no sense here: %s", updvals[0]);
490                 free(step_start);
491                 break;
492             }
494             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
496             current_time_usec = 0;  /* FIXME: how to handle usecs here ? */
498         } else if (strcmp(updvals[0], "N") == 0) {
499             gettimeofday(&tmp_time, 0);
500             normalize_time(&tmp_time);
501             current_time = tmp_time.tv_sec;
502             current_time_usec = tmp_time.tv_usec;
503         } else {
504             double    tmp;
505             char     *old_locale;
507             old_locale = setlocale(LC_NUMERIC, "C");
508             tmp = strtod(updvals[0], 0);
509             setlocale(LC_NUMERIC, old_locale);
510             current_time = floor(tmp);
511             current_time_usec =
512                 (long) ((tmp - (double) current_time) * 1000000.0);
513         }
514         /* dont do any correction for old version RRDs */
515         if (version < 3)
516             current_time_usec = 0;
518         if (current_time < rrd.live_head->last_up ||
519             (current_time == rrd.live_head->last_up &&
520              (long) current_time_usec <=
521              (long) rrd.live_head->last_up_usec)) {
522             rrd_set_error("illegal attempt to update using time %ld when "
523                           "last update time is %ld (minimum one second step)",
524                           current_time, rrd.live_head->last_up);
525             free(step_start);
526             break;
527         }
529         /* seek to the beginning of the rra's */
530         if (rra_current != rra_begin) {
531 #ifndef HAVE_MMAP
532             if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
533                 rrd_set_error("seek error in rrd");
534                 free(step_start);
535                 break;
536             }
537 #endif
538             rra_current = rra_begin;
539         }
540         rra_start = rra_begin;
542         /* when was the current pdp started */
543         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
544         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
546         /* when did the last pdp_st occur */
547         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
548         occu_pdp_st = current_time - occu_pdp_age;
550         /* interval = current_time - rrd.live_head->last_up; */
551         interval = (double) (current_time - rrd.live_head->last_up)
552             + (double) ((long) current_time_usec -
553                         (long) rrd.live_head->last_up_usec) / 1000000.0;
555         if (occu_pdp_st > proc_pdp_st) {
556             /* OK we passed the pdp_st moment */
557             pre_int = (long) occu_pdp_st - rrd.live_head->last_up;  /* how much of the input data
558                                                                      * occurred before the latest
559                                                                      * pdp_st moment*/
560             pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0;  /* adjust usecs */
561             post_int = occu_pdp_age;    /* how much after it */
562             post_int += ((double) current_time_usec) / 1000000.0;   /* adjust usecs */
563         } else {
564             pre_int = interval;
565             post_int = 0;
566         }
568 #ifdef DEBUG
569         printf("proc_pdp_age %lu\t"
570                "proc_pdp_st %lu\t"
571                "occu_pfp_age %lu\t"
572                "occu_pdp_st %lu\t"
573                "int %lf\t"
574                "pre_int %lf\t"
575                "post_int %lf\n", proc_pdp_age, proc_pdp_st,
576                occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
577 #endif
579         /* process the data sources and update the pdp_prep 
580          * area accordingly */
581         for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
582             enum dst_en dst_idx;
584             dst_idx = dst_conv(rrd.ds_def[i].dst);
586             /* make sure we do not build diffs with old last_ds values */
587             if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
588                 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
589                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
590             }
592             /* NOTE: DST_CDEF should never enter this if block, because
593              * updvals[i+1][0] is initialized to 'U'; unless the caller
594              * accidently specified a value for the DST_CDEF. To handle
595              * this case, an extra check is required. */
597             if ((updvals[i + 1][0] != 'U') &&
598                 (dst_idx != DST_CDEF) &&
599                 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
600                 double    rate = DNAN;
601                 char     *old_locale;
603                 /* the data source type defines how to process the data */
604                 /* pdp_new contains rate * time ... eg the bytes
605                  * transferred during the interval. Doing it this way saves
606                  * a lot of math operations */
607                 switch (dst_idx) {
608                 case DST_COUNTER:
609                 case DST_DERIVE:
610                     for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611                         if ((updvals[i + 1][ii] < '0'
612                              || updvals[i + 1][ii] > '9') && (ii != 0
613                                                               && updvals[i
614                                                                          + 1]
615                                                               [ii] != '-')) {
616                             rrd_set_error("not a simple integer: '%s'",
617                                           updvals[i + 1]);
618                             break;
619                         }
620                     }
621                     if (rrd_test_error()) {
622                         break;
623                     }
624                     if (rrd.pdp_prep[i].last_ds[0] != 'U') {
625                         pdp_new[i] =
626                             rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
627                         if (dst_idx == DST_COUNTER) {
628                             /* simple overflow catcher suggested by Andres Kroonmaa */
629                             /* this will fail terribly for non 32 or 64 bit counters ... */
630                             /* are there any others in SNMP land ? */
631                             if (pdp_new[i] < (double) 0.0)
632                                 pdp_new[i] += (double) 4294967296.0;    /* 2^32 */
633                             if (pdp_new[i] < (double) 0.0)
634                                 pdp_new[i] += (double) 18446744069414584320.0;
635                             /* 2^64-2^32 */ ;
636                         }
637                         rate = pdp_new[i] / interval;
638                     } else {
639                         pdp_new[i] = DNAN;
640                     }
641                     break;
642                 case DST_ABSOLUTE:
643                     old_locale = setlocale(LC_NUMERIC, "C");
644                     errno = 0;
645                     pdp_new[i] = strtod(updvals[i + 1], &endptr);
646                     setlocale(LC_NUMERIC, old_locale);
647                     if (errno > 0) {
648                         rrd_set_error("converting '%s' to float: %s",
649                                       updvals[i + 1], rrd_strerror(errno));
650                         break;
651                     };
652                     if (endptr[0] != '\0') {
653                         rrd_set_error
654                             ("conversion of '%s' to float not complete: tail '%s'",
655                              updvals[i + 1], endptr);
656                         break;
657                     }
658                     rate = pdp_new[i] / interval;
659                     break;
660                 case DST_GAUGE:
661                     errno = 0;
662                     old_locale = setlocale(LC_NUMERIC, "C");
663                     pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
664                     setlocale(LC_NUMERIC, old_locale);
665                     if (errno > 0) {
666                         rrd_set_error("converting '%s' to float: %s",
667                                       updvals[i + 1], rrd_strerror(errno));
668                         break;
669                     };
670                     if (endptr[0] != '\0') {
671                         rrd_set_error
672                             ("conversion of '%s' to float not complete: tail '%s'",
673                              updvals[i + 1], endptr);
674                         break;
675                     }
676                     rate = pdp_new[i] / interval;
677                     break;
678                 default:
679                     rrd_set_error("rrd contains unknown DS type : '%s'",
680                                   rrd.ds_def[i].dst);
681                     break;
682                 }
683                 /* break out of this for loop if the error string is set */
684                 if (rrd_test_error()) {
685                     break;
686                 }
687                 /* make sure pdp_temp is neither too large or too small
688                  * if any of these occur it becomes unknown ...
689                  * sorry folks ... */
690                 if (!isnan(rate) &&
691                     ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
692                       rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
693                      (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
694                       rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
695                     pdp_new[i] = DNAN;
696                 }
697             } else {
698                 /* no news is news all the same */
699                 pdp_new[i] = DNAN;
700             }
703             /* make a copy of the command line argument for the next run */
704 #ifdef DEBUG
705             fprintf(stderr,
706                     "prep ds[%lu]\t"
707                     "last_arg '%s'\t"
708                     "this_arg '%s'\t"
709                     "pdp_new %10.2f\n",
710                     i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
711 #endif
712             strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
713             rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
714         }
715         /* break out of the argument parsing loop if the error_string is set */
716         if (rrd_test_error()) {
717             free(step_start);
718             break;
719         }
720         /* has a pdp_st moment occurred since the last run ? */
722         if (proc_pdp_st == occu_pdp_st) {
723             /* no we have not passed a pdp_st moment. therefore update is simple */
725             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
726                 if (isnan(pdp_new[i])) {
727                     /* this is not realy accurate if we use subsecond data arival time
728                        should have thought of it when going subsecond resolution ...
729                        sorry next format change we will have it! */
730                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
731                         floor(interval);
732                 } else {
733                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
734                         rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
735                     } else {
736                         rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
737                     }
738                 }
739 #ifdef DEBUG
740                 fprintf(stderr,
741                         "NO PDP  ds[%lu]\t"
742                         "value %10.2f\t"
743                         "unkn_sec %5lu\n",
744                         i,
745                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
746                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748             }
749         } else {
750             /* an pdp_st has occurred. */
752             /* in pdp_prep[].scratch[PDP_val].u_val we have collected
753                rate*seconds which occurred up to the last run.
754                pdp_new[] contains rate*seconds from the latest run.
755                pdp_temp[] will contain the rate for cdp */
757             for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
758                 /* update pdp_prep to the current pdp_st. */
759                 double    pre_unknown = 0.0;
761                 if (isnan(pdp_new[i])) {
762                     /* a final bit of unkonwn to be added bevore calculation
763                        we use a temporary variable for this so that we
764                        don't have to turn integer lines before using the value */
765                     pre_unknown = pre_int;
766                 } else {
767                     if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
768                         rrd.pdp_prep[i].scratch[PDP_val].u_val =
769                             pdp_new[i] / interval * pre_int;
770                     } else {
771                         rrd.pdp_prep[i].scratch[PDP_val].u_val +=
772                             pdp_new[i] / interval * pre_int;
773                     }
774                 }
777                 /* if too much of the pdp_prep is unknown we dump it */
778                 if (
779                        /* removed because this does not agree with the
780                           definition that a heartbeat can be unknown */
781                        /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
782                           > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
783                        /* if the interval is larger thatn mrhb we get NAN */
784                        (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
785                        (occu_pdp_st - proc_pdp_st <=
786                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
787                     pdp_temp[i] = DNAN;
788                 } else {
789                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
790                         / ((double) (occu_pdp_st - proc_pdp_st
791                                      -
792                                      rrd.pdp_prep[i].
793                                      scratch[PDP_unkn_sec_cnt].u_cnt)
794                            - pre_unknown);
795                 }
797                 /* process CDEF data sources; remember each CDEF DS can
798                  * only reference other DS with a lower index number */
799                 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
800                     rpnp_t   *rpnp;
802                     rpnp =
803                         rpn_expand((rpn_cdefds_t *) &
804                                    (rrd.ds_def[i].par[DS_cdef]));
805                     /* substitue data values for OP_VARIABLE nodes */
806                     for (ii = 0; rpnp[ii].op != OP_END; ii++) {
807                         if (rpnp[ii].op == OP_VARIABLE) {
808                             rpnp[ii].op = OP_NUMBER;
809                             rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
810                         }
811                     }
812                     /* run the rpn calculator */
813                     if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
814                         free(rpnp);
815                         break;  /* exits the data sources pdp_temp loop */
816                     }
817                 }
819                 /* make pdp_prep ready for the next run */
820                 if (isnan(pdp_new[i])) {
821                     /* this is not realy accurate if we use subsecond data arival time
822                        should have thought of it when going subsecond resolution ...
823                        sorry next format change we will have it! */
824                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
825                         floor(post_int);
826                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
827                 } else {
828                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
829                     rrd.pdp_prep[i].scratch[PDP_val].u_val =
830                         pdp_new[i] / interval * post_int;
831                 }
833 #ifdef DEBUG
834                 fprintf(stderr,
835                         "PDP UPD ds[%lu]\t"
836                         "pdp_temp %10.2f\t"
837                         "new_prep %10.2f\t"
838                         "new_unkn_sec %5lu\n",
839                         i, pdp_temp[i],
840                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
841                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
842 #endif
843             }
845             /* if there were errors during the last loop, bail out here */
846             if (rrd_test_error()) {
847                 free(step_start);
848                 break;
849             }
851             /* compute the number of elapsed pdp_st moments */
852             elapsed_pdp_st =
853                 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
854 #ifdef DEBUG
855             fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
856 #endif
857             if (rra_step_cnt == NULL) {
858                 rra_step_cnt = (unsigned long *)
859                     malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
860             }
862             for (i = 0, rra_start = rra_begin;
863                  i < rrd.stat_head->rra_cnt;
864                  rra_start +=
865                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
866                  sizeof(rrd_value_t), i++) {
867                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
868                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
869                     (proc_pdp_st / rrd.stat_head->pdp_step) %
870                     rrd.rra_def[i].pdp_cnt;
871                 if (start_pdp_offset <= elapsed_pdp_st) {
872                     rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
873                         rrd.rra_def[i].pdp_cnt + 1;
874                 } else {
875                     rra_step_cnt[i] = 0;
876                 }
878                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
879                     /* If this is a bulk update, we need to skip ahead in
880                        the seasonal arrays so that they will be correct for
881                        the next observed value;
882                        note that for the bulk update itself, no update will
883                        occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
884                        and DEVPREDICT will be set to DNAN. */
885                     if (rra_step_cnt[i] > 2) {
886                         /* skip update by resetting rra_step_cnt[i],
887                            note that this is not data source specific; this is
888                            due to the bulk update, not a DNAN value for the
889                            specific data source. */
890                         rra_step_cnt[i] = 0;
891                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
892                                         elapsed_pdp_st, &last_seasonal_coef);
893                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
894                                         elapsed_pdp_st + 1, &seasonal_coef);
895                     }
897                     /* periodically run a smoother for seasonal effects */
898                     /* Need to use first cdp parameter buffer to track
899                      * burnin (burnin requires a specific smoothing schedule).
900                      * The CDP_init_seasonal parameter is really an RRA level,
901                      * not a data source within RRA level parameter, but the rra_def
902                      * is read only for rrd_update (not flushed to disk). */
903                     iii = i * (rrd.stat_head->ds_cnt);
904                     if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
905                         <= BURNIN_CYCLES) {
906                         if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907                             > rrd.rra_def[i].row_cnt - 1) {
908                             /* mark off one of the burnin cycles */
909                             ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
910                                u_cnt);
911                             schedule_smooth = 1;
912                         }
913                     } else {
914                         /* someone has no doubt invented a trick to deal with this
915                          * wrap around, but at least this code is clear. */
916                         if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
917                             u_cnt > rrd.rra_ptr[i].cur_row) {
918                             /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
919                              * mapping between PDP and CDP */
920                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
921                                 >=
922                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
923                                 u_cnt) {
924 #ifdef DEBUG
925                                 fprintf(stderr,
926                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
927                                         rrd.rra_ptr[i].cur_row,
928                                         elapsed_pdp_st,
929                                         rrd.rra_def[i].
930                                         par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932                                 schedule_smooth = 1;
933                             }
934                         } else {
935                             /* can't rely on negative numbers because we are working with
936                              * unsigned values */
937                             /* Don't need modulus here. If we've wrapped more than once, only
938                              * one smooth is executed at the end. */
939                             if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
940                                 rrd.rra_def[i].row_cnt
941                                 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
942                                 rrd.rra_def[i].row_cnt >=
943                                 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
944                                 u_cnt) {
945 #ifdef DEBUG
946                                 fprintf(stderr,
947                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
948                                         rrd.rra_ptr[i].cur_row,
949                                         elapsed_pdp_st,
950                                         rrd.rra_def[i].
951                                         par[RRA_seasonal_smooth_idx].u_cnt);
952 #endif
953                                 schedule_smooth = 1;
954                             }
955                         }
956                     }
958                     rra_current = rrd_tell(rrd_file);
959                 }
960                 /* if cf is DEVSEASONAL or SEASONAL */
961                 if (rrd_test_error())
962                     break;
964                 /* update CDP_PREP areas */
965                 /* loop over data soures within each RRA */
966                 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
968                     /* iii indexes the CDP prep area for this data source within the RRA */
969                     iii = i * rrd.stat_head->ds_cnt + ii;
971                     if (rrd.rra_def[i].pdp_cnt > 1) {
973                         if (rra_step_cnt[i] > 0) {
974                             /* If we are in this block, as least 1 CDP value will be written to
975                              * disk, this is the CDP_primary_val entry. If more than 1 value needs
976                              * to be written, then the "fill in" value is the CDP_secondary_val
977                              * entry. */
978                             if (isnan(pdp_temp[ii])) {
979                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
980                                     u_cnt += start_pdp_offset;
981                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
982                                     u_val = DNAN;
983                             } else {
984                                 /* CDP_secondary value is the RRA "fill in" value for intermediary
985                                  * CDP data entries. No matter the CF, the value is the same because
986                                  * the average, max, min, and last of a list of identical values is
987                                  * the same, namely, the value itself. */
988                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
989                                     u_val = pdp_temp[ii];
990                             }
992                             if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
993                                 u_cnt >
994                                 rrd.rra_def[i].pdp_cnt *
995                                 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
996                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
997                                     u_val = DNAN;
998                                 /* initialize carry over */
999                                 if (current_cf == CF_AVERAGE) {
1000                                     if (isnan(pdp_temp[ii])) {
1001                                         rrd.cdp_prep[iii].scratch[CDP_val].
1002                                             u_val = DNAN;
1003                                     } else {
1004                                         rrd.cdp_prep[iii].scratch[CDP_val].
1005                                             u_val =
1006                                             pdp_temp[ii] *
1007                                             ((elapsed_pdp_st -
1008                                               start_pdp_offset) %
1009                                              rrd.rra_def[i].pdp_cnt);
1010                                     }
1011                                 } else {
1012                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1013                                         pdp_temp[ii];
1014                                 }
1015                             } else {
1016                                 rrd_value_t cum_val, cur_val;
1018                                 switch (current_cf) {
1019                                 case CF_AVERAGE:
1020                                     cum_val =
1021                                         IFDNAN(rrd.cdp_prep[iii].
1022                                                scratch[CDP_val].u_val, 0.0);
1023                                     cur_val = IFDNAN(pdp_temp[ii], 0.0);
1024                                     rrd.cdp_prep[iii].
1025                                         scratch[CDP_primary_val].u_val =
1026                                         (cum_val +
1027                                          cur_val * start_pdp_offset) /
1028                                         (rrd.rra_def[i].pdp_cnt -
1029                                          rrd.cdp_prep[iii].
1030                                          scratch[CDP_unkn_pdp_cnt].u_cnt);
1031                                     /* initialize carry over value */
1032                                     if (isnan(pdp_temp[ii])) {
1033                                         rrd.cdp_prep[iii].scratch[CDP_val].
1034                                             u_val = DNAN;
1035                                     } else {
1036                                         rrd.cdp_prep[iii].scratch[CDP_val].
1037                                             u_val =
1038                                             pdp_temp[ii] *
1039                                             ((elapsed_pdp_st -
1040                                               start_pdp_offset) %
1041                                              rrd.rra_def[i].pdp_cnt);
1042                                     }
1043                                     break;
1044                                 case CF_MAXIMUM:
1045                                     cum_val =
1046                                         IFDNAN(rrd.cdp_prep[iii].
1047                                                scratch[CDP_val].u_val, -DINF);
1048                                     cur_val = IFDNAN(pdp_temp[ii], -DINF);
1049 #ifdef DEBUG
1050                                     if (isnan
1051                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1052                                          u_val) && isnan(pdp_temp[ii])) {
1053                                         fprintf(stderr,
1054                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1055                                                 i, ii);
1056                                         exit(-1);
1057                                     }
1058 #endif
1059                                     if (cur_val > cum_val)
1060                                         rrd.cdp_prep[iii].
1061                                             scratch[CDP_primary_val].u_val =
1062                                             cur_val;
1063                                     else
1064                                         rrd.cdp_prep[iii].
1065                                             scratch[CDP_primary_val].u_val =
1066                                             cum_val;
1067                                     /* initialize carry over value */
1068                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1069                                         pdp_temp[ii];
1070                                     break;
1071                                 case CF_MINIMUM:
1072                                     cum_val =
1073                                         IFDNAN(rrd.cdp_prep[iii].
1074                                                scratch[CDP_val].u_val, DINF);
1075                                     cur_val = IFDNAN(pdp_temp[ii], DINF);
1076 #ifdef DEBUG
1077                                     if (isnan
1078                                         (rrd.cdp_prep[iii].scratch[CDP_val].
1079                                          u_val) && isnan(pdp_temp[ii])) {
1080                                         fprintf(stderr,
1081                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1082                                                 i, ii);
1083                                         exit(-1);
1084                                     }
1085 #endif
1086                                     if (cur_val < cum_val)
1087                                         rrd.cdp_prep[iii].
1088                                             scratch[CDP_primary_val].u_val =
1089                                             cur_val;
1090                                     else
1091                                         rrd.cdp_prep[iii].
1092                                             scratch[CDP_primary_val].u_val =
1093                                             cum_val;
1094                                     /* initialize carry over value */
1095                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1096                                         pdp_temp[ii];
1097                                     break;
1098                                 case CF_LAST:
1099                                 default:
1100                                     rrd.cdp_prep[iii].
1101                                         scratch[CDP_primary_val].u_val =
1102                                         pdp_temp[ii];
1103                                     /* initialize carry over value */
1104                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1105                                         pdp_temp[ii];
1106                                     break;
1107                                 }
1108                             }   /* endif meets xff value requirement for a valid value */
1109                             /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1110                              * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1111                             if (isnan(pdp_temp[ii]))
1112                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1113                                     u_cnt =
1114                                     (elapsed_pdp_st -
1115                                      start_pdp_offset) %
1116                                     rrd.rra_def[i].pdp_cnt;
1117                             else
1118                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1119                                     u_cnt = 0;
1120                         } else {    /* rra_step_cnt[i]  == 0 */
1122 #ifdef DEBUG
1123                             if (isnan
1124                                 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1125                                 fprintf(stderr,
1126                                         "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1127                                         i, ii);
1128                             } else {
1129                                 fprintf(stderr,
1130                                         "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1131                                         i, ii,
1132                                         rrd.cdp_prep[iii].scratch[CDP_val].
1133                                         u_val);
1134                             }
1135 #endif
1136                             if (isnan(pdp_temp[ii])) {
1137                                 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1138                                     u_cnt += elapsed_pdp_st;
1139                             } else
1140                                 if (isnan
1141                                     (rrd.cdp_prep[iii].scratch[CDP_val].
1142                                      u_val)) {
1143                                 if (current_cf == CF_AVERAGE) {
1144                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1145                                         pdp_temp[ii] * elapsed_pdp_st;
1146                                 } else {
1147                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1148                                         pdp_temp[ii];
1149                                 }
1150 #ifdef DEBUG
1151                                 fprintf(stderr,
1152                                         "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1153                                         i, ii,
1154                                         rrd.cdp_prep[iii].scratch[CDP_val].
1155                                         u_val);
1156 #endif
1157                             } else {
1158                                 switch (current_cf) {
1159                                 case CF_AVERAGE:
1160                                     rrd.cdp_prep[iii].scratch[CDP_val].
1161                                         u_val +=
1162                                         pdp_temp[ii] * elapsed_pdp_st;
1163                                     break;
1164                                 case CF_MINIMUM:
1165                                     if (pdp_temp[ii] <
1166                                         rrd.cdp_prep[iii].scratch[CDP_val].
1167                                         u_val)
1168                                         rrd.cdp_prep[iii].scratch[CDP_val].
1169                                             u_val = pdp_temp[ii];
1170                                     break;
1171                                 case CF_MAXIMUM:
1172                                     if (pdp_temp[ii] >
1173                                         rrd.cdp_prep[iii].scratch[CDP_val].
1174                                         u_val)
1175                                         rrd.cdp_prep[iii].scratch[CDP_val].
1176                                             u_val = pdp_temp[ii];
1177                                     break;
1178                                 case CF_LAST:
1179                                 default:
1180                                     rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1181                                         pdp_temp[ii];
1182                                     break;
1183                                 }
1184                             }
1185                         }
1186                     } else {    /* rrd.rra_def[i].pdp_cnt == 1 */
1187                         if (elapsed_pdp_st > 2) {
1188                             switch (current_cf) {
1189                             case CF_AVERAGE:
1190                             default:
1191                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1192                                     u_val = pdp_temp[ii];
1193                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1194                                     u_val = pdp_temp[ii];
1195                                 break;
1196                             case CF_SEASONAL:
1197                             case CF_DEVSEASONAL:
1198                                 /* need to update cached seasonal values, so they are consistent
1199                                  * with the bulk update */
1200                                 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1201                                  * CDP_last_deviation are the same. */
1202                                 rrd.cdp_prep[iii].
1203                                     scratch[CDP_hw_last_seasonal].u_val =
1204                                     last_seasonal_coef[ii];
1205                                 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1206                                     u_val = seasonal_coef[ii];
1207                                 break;
1208                             case CF_HWPREDICT:
1209                             case CF_MHWPREDICT:
1210                                 /* need to update the null_count and last_null_count.
1211                                  * even do this for non-DNAN pdp_temp because the
1212                                  * algorithm is not learning from batch updates. */
1213                                 rrd.cdp_prep[iii].scratch[CDP_null_count].
1214                                     u_cnt += elapsed_pdp_st;
1215                                 rrd.cdp_prep[iii].
1216                                     scratch[CDP_last_null_count].u_cnt +=
1217                                     elapsed_pdp_st - 1;
1218                                 /* fall through */
1219                             case CF_DEVPREDICT:
1220                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1221                                     u_val = DNAN;
1222                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1223                                     u_val = DNAN;
1224                                 break;
1225                             case CF_FAILURES:
1226                                 /* do not count missed bulk values as failures */
1227                                 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1228                                     u_val = 0;
1229                                 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1230                                     u_val = 0;
1231                                 /* need to reset violations buffer.
1232                                  * could do this more carefully, but for now, just
1233                                  * assume a bulk update wipes away all violations. */
1234                                 erase_violations(&rrd, iii, i);
1235                                 break;
1236                             }
1237                         }
1238                     }   /* endif rrd.rra_def[i].pdp_cnt == 1 */
1240                     if (rrd_test_error())
1241                         break;
1243                 }       /* endif data sources loop */
1244             }           /* end RRA Loop */
1246             /* this loop is only entered if elapsed_pdp_st < 3 */
1247             for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1248                  j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1249                 for (i = 0, rra_start = rra_begin;
1250                      i < rrd.stat_head->rra_cnt;
1251                      rra_start +=
1252                      rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1253                      sizeof(rrd_value_t), i++) {
1254                     if (rrd.rra_def[i].pdp_cnt > 1)
1255                         continue;
1257                     current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1258                     if (current_cf == CF_SEASONAL
1259                         || current_cf == CF_DEVSEASONAL) {
1260                         lookup_seasonal(&rrd, i, rra_start, rrd_file,
1261                                         elapsed_pdp_st + (scratch_idx ==
1262                                                           CDP_primary_val ? 1
1263                                                           : 2),
1264                                         &seasonal_coef);
1265                         rra_current = rrd_tell(rrd_file);
1266                     }
1267                     if (rrd_test_error())
1268                         break;
1269                     /* loop over data soures within each RRA */
1270                     for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1271                         update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1272                                            i * (rrd.stat_head->ds_cnt) + ii,
1273                                            i, ii, scratch_idx, seasonal_coef);
1274                     }
1275                 }       /* end RRA Loop */
1276                 if (rrd_test_error())
1277                     break;
1278             }           /* end elapsed_pdp_st loop */
1280             if (rrd_test_error())
1281                 break;
1283             /* Ready to write to disk */
1284             /* Move sequentially through the file, writing one RRA at a time.
1285              * Note this architecture divorces the computation of CDP with
1286              * flushing updated RRA entries to disk. */
1287             for (i = 0, rra_start = rra_begin;
1288                  i < rrd.stat_head->rra_cnt;
1289                  rra_start +=
1290                  rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1291                  sizeof(rrd_value_t), i++) {
1292                 /* is th5Aere anything to write for this RRA? If not, continue. */
1293                 if (rra_step_cnt[i] == 0)
1294                     continue;
1296                 /* write the first row */
1297 #ifdef DEBUG
1298                 fprintf(stderr, "  -- RRA Preseek %ld\n", rrd_file->pos);
1299 #endif
1300                 rrd.rra_ptr[i].cur_row++;
1301                 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1302                     rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1303                 /* positition on the first row */
1304                 rra_pos_tmp = rra_start +
1305                     (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1306                     sizeof(rrd_value_t);
1307                 if (rra_pos_tmp != rra_current) {
1308                     if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1309                         rrd_set_error("seek error in rrd");
1310                         break;
1311                     }
1312                     rra_current = rra_pos_tmp;
1313                 }
1314 #ifdef DEBUG
1315                 fprintf(stderr, "  -- RRA Postseek %ld\n", rrd_file->pos);
1316 #endif
1317                 scratch_idx = CDP_primary_val;
1318                 if (pcdp_summary != NULL) {
1319                     rra_time = (current_time - current_time
1320                                 % (rrd.rra_def[i].pdp_cnt *
1321                                    rrd.stat_head->pdp_step))
1322                         -
1323                         ((rra_step_cnt[i] -
1324                           1) * rrd.rra_def[i].pdp_cnt *
1325                          rrd.stat_head->pdp_step);
1326                 }
1327                 pcdp_summary =
1328                     write_RRA_row(rrd_file, &rrd, i, &rra_current,
1329                                   scratch_idx, pcdp_summary, &rra_time);
1330                 if (rrd_test_error())
1331                     break;
1333                 /* write other rows of the bulk update, if any */
1334                 scratch_idx = CDP_secondary_val;
1335                 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1336                     if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1337 #ifdef DEBUG
1338                         fprintf(stderr,
1339                                 "Wraparound for RRA %s, %lu updates left\n",
1340                                 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1341 #endif
1342                         /* wrap */
1343                         rrd.rra_ptr[i].cur_row = 0;
1344                         /* seek back to beginning of current rra */
1345                         if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1346                             rrd_set_error("seek error in rrd");
1347                             break;
1348                         }
1349 #ifdef DEBUG
1350                         fprintf(stderr, "  -- Wraparound Postseek %ld\n",
1351                                 rrd_file->pos);
1352 #endif
1353                         rra_current = rra_start;
1354                     }
1355                     if (pcdp_summary != NULL) {
1356                         rra_time = (current_time - current_time
1357                                     % (rrd.rra_def[i].pdp_cnt *
1358                                        rrd.stat_head->pdp_step))
1359                             -
1360                             ((rra_step_cnt[i] -
1361                               2) * rrd.rra_def[i].pdp_cnt *
1362                              rrd.stat_head->pdp_step);
1363                     }
1364                     pcdp_summary =
1365                         write_RRA_row(rrd_file, &rrd, i, &rra_current,
1366                                       scratch_idx, pcdp_summary, &rra_time);
1367                 }
1369                 if (rrd_test_error())
1370                     break;
1371             }           /* RRA LOOP */
1373             /* break out of the argument parsing loop if error_string is set */
1374             if (rrd_test_error()) {
1375                 free(step_start);
1376                 break;
1377             }
1379         }               /* endif a pdp_st has occurred */
1380         rrd.live_head->last_up = current_time;
1381         rrd.live_head->last_up_usec = current_time_usec;
1382         free(step_start);
1383     }                   /* function argument loop */
1385     if (seasonal_coef != NULL)
1386         free(seasonal_coef);
1387     if (last_seasonal_coef != NULL)
1388         free(last_seasonal_coef);
1389     if (rra_step_cnt != NULL)
1390         free(rra_step_cnt);
1391     rpnstack_free(&rpnstack);
1393 #if 0
1394     //rrd_flush(rrd_file);    //XXX: really needed?
1395 #endif
1396     /* if we got here and if there is an error and if the file has not been
1397      * written to, then close things up and return. */
1398     if (rrd_test_error()) {
1399         goto err_free_pdp_new;
1400     }
1402     /* aargh ... that was tough ... so many loops ... anyway, its done.
1403      * we just need to write back the live header portion now*/
1405     if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1406                             + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1407                             + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1408                  SEEK_SET) != 0) {
1409         rrd_set_error("seek rrd for live header writeback");
1410         goto err_free_pdp_new;
1411     }
1412     /* for mmap, we did already write to the underlying mapping, so we do
1413        not need to write again.  */
1414 #ifndef HAVE_MMAP
1415     if (version >= 3) {
1416         if (rrd_write(rrd_file, rrd.live_head,
1417                       sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1418             rrd_set_error("rrd_write live_head to rrd");
1419             goto err_free_pdp_new;
1420         }
1421     } else {
1422         if (rrd_write(rrd_file, &rrd.live_head->last_up,
1423                       sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1424             rrd_set_error("rrd_write live_head to rrd");
1425             goto err_free_pdp_new;
1426         }
1427     }
1430     if (rrd_write(rrd_file, rrd.pdp_prep,
1431                   sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1432         != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1433         rrd_set_error("rrd_write pdp_prep to rrd");
1434         goto err_free_pdp_new;
1435     }
1437     if (rrd_write(rrd_file, rrd.cdp_prep,
1438                   sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1439                   rrd.stat_head->ds_cnt)
1440         != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1441                       rrd.stat_head->ds_cnt)) {
1443         rrd_set_error("rrd_write cdp_prep to rrd");
1444         goto err_free_pdp_new;
1445     }
1447     if (rrd_write(rrd_file, rrd.rra_ptr,
1448                   sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1449         != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1450         rrd_set_error("rrd_write rra_ptr to rrd");
1451         goto err_free_pdp_new;
1452     }
1453 #endif
1455     /* rrd_flush(rrd_file); */
1457     /* calling the smoothing code here guarantees at most
1458      * one smoothing operation per rrd_update call. Unfortunately,
1459      * it is possible with bulk updates, or a long-delayed update
1460      * for smoothing to occur off-schedule. This really isn't
1461      * critical except during the burning cycles. */
1462     if (schedule_smooth) {
1464         rra_start = rra_begin;
1465         for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1466             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1467                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1468 #ifdef DEBUG
1469                 fprintf(stderr, "Running smoother for rra %ld\n", i);
1470 #endif
1471                 apply_smoother(&rrd, i, rra_start, rrd_file);
1472                 if (rrd_test_error())
1473                     break;
1474             }
1475             rra_start += rrd.rra_def[i].row_cnt
1476                 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1477         }
1478     }
1480 /*    rrd_dontneed(rrd_file,&rrd); */
1481     rrd_free(&rrd);
1482     rrd_close(rrd_file);
1484     free(pdp_new);
1485     free(tmpl_idx);
1486     free(pdp_temp);
1487     free(updvals);
1488     return (0);
1490   err_free_pdp_new:
1491     free(pdp_new);
1492   err_free_tmpl_idx:
1493     free(tmpl_idx);
1494   err_free_pdp_temp:
1495     free(pdp_temp);
1496   err_free_updvals:
1497     free(updvals);
1498   err_close:
1499     rrd_close(rrd_file);
1500   err_free:
1501     rrd_free(&rrd);
1502   err_out:
1503     return (-1);
1506 /*
1507  * get exclusive lock to whole file.
1508  * lock gets removed when we close the file
1509  *
1510  * returns 0 on success
1511  */
1512 int LockRRD(
1513     int in_file)
1515     int       rcstat;
1517     {
1518 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1519         struct _stat st;
1521         if (_fstat(in_file, &st) == 0) {
1522             rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1523         } else {
1524             rcstat = -1;
1525         }
1526 #else
1527         struct flock lock;
1529         lock.l_type = F_WRLCK;  /* exclusive write lock */
1530         lock.l_len = 0; /* whole file */
1531         lock.l_start = 0;   /* start of file */
1532         lock.l_whence = SEEK_SET;   /* end of file */
1534         rcstat = fcntl(in_file, F_SETLK, &lock);
1535 #endif
1536     }
1538     return (rcstat);