Code

Imported upstream version 1.2.28.
[pkg-rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.28  Copyright by Tobi Oetiker, 1997-2008
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id: rrd_update.c 1450 2008-07-23 13:45:41Z oetiker $
7  *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14  #include <sys/locking.h>
15  #include <sys/stat.h>
16  #include <io.h>
17 #endif
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
32 #if (defined(__MINGW32__) && \
33        ((__MINGW32_MAJOR_VERSION == 3 && __MINGW32_MINOR_VERSION >= 12) || __MINGW32_MAJOR_VERSION > 3))
34 #include <sys/time.h>
35 #else
37 struct timeval {
38         time_t tv_sec; /* seconds */
39         long tv_usec;  /* microseconds */
40 };
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49         struct _timeb current_time;
51         _ftime(&current_time);
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
56         return 0;
57 }
59 #endif /* mingw32 3.4.5 */
60 #endif
62 /*
63  * normilize time as returned by gettimeofday. usec part must
64  * be always >= 0
65  */
66 static void normalize_time(struct timeval *t)
67 {
68         if(t->tv_usec < 0) {
69                 t->tv_sec--;
70                 t->tv_usec += 1000000L;
71         }
72 }
74 /* Local prototypes */
75 int LockRRD(FILE *rrd_file);
76 #ifdef HAVE_MMAP
77 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
78                                         unsigned long *rra_current,
79                                         unsigned short CDP_scratch_idx,
80 #ifndef DEBUG
81 FILE UNUSED(*rrd_file),
82 #else
83 FILE *rrd_file,
84 #endif
85                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
86 #else
87 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
88                                         unsigned long *rra_current,
89                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
90                                         info_t *pcdp_summary, time_t *rra_time);
91 #endif
92 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
93 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
94                                         info_t*);
96 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
99 info_t *rrd_update_v(int argc, char **argv)
101     char             *tmplt = NULL;          
102         info_t *result = NULL;
103         infoval rc;
104       rc.u_int = -1;
105     optind = 0; opterr = 0;  /* initialize getopt */
107     while (1) {
108                 static struct option long_options[] =
109                         {
110                                 {"template",      required_argument, 0, 't'},
111                                 {0,0,0,0}
112                         };
113                 int option_index = 0;
114                 int opt;
115                 opt = getopt_long(argc, argv, "t:", 
116                                                   long_options, &option_index);
117                 
118                 if (opt == EOF)
119                         break;
120                 
121                 switch(opt) {
122                 case 't':
123                         tmplt = optarg;
124                         break;
125                 
126                 case '?':
127                         rrd_set_error("unknown option '%s'",argv[optind-1]);
128                         goto end_tag;
129                 }
130     }
132     /* need at least 2 arguments: filename, data. */
133     if (argc-optind < 2) {
134                 rrd_set_error("Not enough arguments");
135                 goto end_tag;
136     }
137     rc.u_int = 0;
138     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
139         rc.u_int = _rrd_update(argv[optind], tmplt,
140                       argc - optind - 1, (const char **)(argv + optind + 1), result);
141     result->value.u_int = rc.u_int;
142 end_tag:
143     return result;
146 int
147 rrd_update(int argc, char **argv)
149     char             *tmplt = NULL;          
150     int rc;
151     optind = 0; opterr = 0;  /* initialize getopt */
153     while (1) {
154                 static struct option long_options[] =
155                         {
156                                 {"template",      required_argument, 0, 't'},
157                                 {0,0,0,0}
158                         };
159                 int option_index = 0;
160                 int opt;
161                 opt = getopt_long(argc, argv, "t:", 
162                                                   long_options, &option_index);
163                 
164                 if (opt == EOF)
165                         break;
166                 
167                 switch(opt) {
168                 case 't':
169                         tmplt = optarg;
170                         break;
171                 
172                 case '?':
173                         rrd_set_error("unknown option '%s'",argv[optind-1]);
174                         return(-1);
175                 }
176     }
178     /* need at least 2 arguments: filename, data. */
179     if (argc-optind < 2) {
180                 rrd_set_error("Not enough arguments");
182                 return -1;
183     }
184  
185         rc = rrd_update_r(argv[optind], tmplt,
186                       argc - optind - 1, (const char **)(argv + optind + 1));
187     return rc;
190 int
191 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
193    return _rrd_update(filename, tmplt, argc, argv, NULL);
196 int
197 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv, 
198    info_t *pcdp_summary)
201     int              arg_i = 2;
202     short            j;
203     unsigned long    i,ii,iii=1;
205     unsigned long    rra_begin;          /* byte pointer to the rra
206                                           * area in the rrd file.  this
207                                           * pointer never changes value */
208     unsigned long    rra_start;          /* byte pointer to the rra
209                                           * area in the rrd file.  this
210                                           * pointer changes as each rrd is
211                                           * processed. */
212     unsigned long    rra_current;        /* byte pointer to the current write
213                                           * spot in the rrd file. */
214     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
215     double           interval,
216                      pre_int,post_int;   /* interval between this and
217                                           * the last run */
218     unsigned long    proc_pdp_st;        /* which pdp_st was the last
219                                           * to be processed */
220     unsigned long    occu_pdp_st;        /* when was the pdp_st
221                                           * before the last update
222                                           * time */
223     unsigned long    proc_pdp_age;       /* how old was the data in
224                                           * the pdp prep area when it
225                                           * was last updated */
226     unsigned long    occu_pdp_age;       /* how long ago was the last
227                                           * pdp_step time */
228     rrd_value_t      *pdp_new;           /* prepare the incoming data
229                                           * to be added the the
230                                           * existing entry */
231     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
232                                           * to be added the the
233                                           * cdp values */
235     long             *tmpl_idx;          /* index representing the settings
236                                             transported by the tmplt index */
237     unsigned long    tmpl_cnt = 2;       /* time and data */
239     FILE             *rrd_file;
240     rrd_t            rrd;
241     time_t           current_time = 0;
242     time_t           rra_time = 0;       /* time of update for a RRA */
243     unsigned long    current_time_usec=0;/* microseconds part of current time */
244     struct timeval   tmp_time;           /* used for time conversion */
246     char             **updvals;
247     int              schedule_smooth = 0;
248         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
249                                          /* a vector of future Holt-Winters seasonal coefs */
250     unsigned long    elapsed_pdp_st;
251                                          /* number of elapsed PDP steps since last update */
252     unsigned long    *rra_step_cnt = NULL;
253                                          /* number of rows to be updated in an RRA for a data
254                                           * value. */
255     unsigned long    start_pdp_offset;
256                                          /* number of PDP steps since the last update that
257                                           * are assigned to the first CDP to be generated
258                                           * since the last update. */
259     unsigned short   scratch_idx;
260                                          /* index into the CDP scratch array */
261     enum cf_en       current_cf;
262                                          /* numeric id of the current consolidation function */
263     rpnstack_t       rpnstack; /* used for COMPUTE DS */
264     int              version;  /* rrd version */
265     char             *endptr; /* used in the conversion */
267 #ifdef HAVE_MMAP
268     void             *rrd_mmaped_file;
269     unsigned long    rrd_filesize;
270 #endif
273     rpnstack_init(&rpnstack);
275     /* need at least 1 arguments: data. */
276     if (argc < 1) {
277         rrd_set_error("Not enough arguments");
278         return -1;
279     }
280     
281     
283     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
284         return -1;
285     }
287     /* initialize time */
288     version = atoi(rrd.stat_head->version);
289     gettimeofday(&tmp_time, 0);
290     normalize_time(&tmp_time);
291     current_time = tmp_time.tv_sec;
292     if(version >= 3) {
293         current_time_usec = tmp_time.tv_usec;
294     }
295     else {
296         current_time_usec = 0;
297     }
299     rra_current = rra_start = rra_begin = ftell(rrd_file);
300     /* This is defined in the ANSI C standard, section 7.9.5.3:
302         When a file is opened with udpate mode ('+' as the second
303         or third character in the ... list of mode argument
304         variables), both input and ouptut may be performed on the
305         associated stream.  However, ...  input may not be directly
306         followed by output without an intervening call to a file
307         positioning function, unless the input oepration encounters
308         end-of-file. */
309 #ifdef HAVE_MMAP
310     fseek(rrd_file, 0, SEEK_END);
311     rrd_filesize = ftell(rrd_file);
312     fseek(rrd_file, rra_current, SEEK_SET);
313 #else
314     fseek(rrd_file, 0, SEEK_CUR);
315 #endif
317     
318     /* get exclusive lock to whole file.
319      * lock gets removed when we close the file.
320      */
321     if (LockRRD(rrd_file) != 0) {
322       rrd_set_error("could not lock RRD");
323       rrd_free(&rrd);
324       fclose(rrd_file);
325       return(-1);   
326     } 
328     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
329         rrd_set_error("allocating updvals pointer array");
330         rrd_free(&rrd);
331         fclose(rrd_file);
332         return(-1);
333     }
335     if ((pdp_temp = malloc(sizeof(rrd_value_t)
336                            *rrd.stat_head->ds_cnt))==NULL){
337         rrd_set_error("allocating pdp_temp ...");
338         free(updvals);
339         rrd_free(&rrd);
340         fclose(rrd_file);
341         return(-1);
342     }
344     if ((tmpl_idx = malloc(sizeof(unsigned long)
345                            *(rrd.stat_head->ds_cnt+1)))==NULL){
346         rrd_set_error("allocating tmpl_idx ...");
347         free(pdp_temp);
348         free(updvals);
349         rrd_free(&rrd);
350         fclose(rrd_file);
351         return(-1);
352     }
353     /* initialize tmplt redirector */
354     /* default config example (assume DS 1 is a CDEF DS)
355        tmpl_idx[0] -> 0; (time)
356        tmpl_idx[1] -> 1; (DS 0)
357        tmpl_idx[2] -> 3; (DS 2)
358        tmpl_idx[3] -> 4; (DS 3) */
359     tmpl_idx[0] = 0; /* time */
360     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
361         {
362            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
363               tmpl_idx[ii++]=i;
364         }
365     tmpl_cnt= ii;
367     if (tmplt) {
368         /* we should work on a writeable copy here */
369         char *dsname;
370         unsigned int tmpl_len;
371         char *tmplt_copy = strdup(tmplt);
372         dsname = tmplt_copy;
373         tmpl_cnt = 1; /* the first entry is the time */
374         tmpl_len = strlen(tmplt_copy);
375         for(i=0;i<=tmpl_len ;i++) {
376             if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
377                 tmplt_copy[i] = '\0';
378                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
379                     rrd_set_error("tmplt contains more DS definitions than RRD");
380                     free(updvals); free(pdp_temp);
381                     free(tmpl_idx); rrd_free(&rrd);
382                     fclose(rrd_file); return(-1);
383                 }
384                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
385                     rrd_set_error("unknown DS name '%s'",dsname);
386                     free(updvals); free(pdp_temp);
387                     free(tmplt_copy);
388                     free(tmpl_idx); rrd_free(&rrd);
389                     fclose(rrd_file); return(-1);
390                 } else {
391                   /* the first element is always the time */
392                   tmpl_idx[tmpl_cnt-1]++; 
393                   /* go to the next entry on the tmplt_copy */
394                   dsname = &tmplt_copy[i+1];
395                   /* fix the damage we did before */
396                   if (i<tmpl_len) {
397                      tmplt_copy[i]=':';
398                   } 
400                 }
401             }       
402         }
403         free(tmplt_copy);
404     }
405     if ((pdp_new = malloc(sizeof(rrd_value_t)
406                           *rrd.stat_head->ds_cnt))==NULL){
407         rrd_set_error("allocating pdp_new ...");
408         free(updvals);
409         free(pdp_temp);
410         free(tmpl_idx);
411         rrd_free(&rrd);
412         fclose(rrd_file);
413         return(-1);
414     }
416 #ifdef HAVE_MMAP
417     rrd_mmaped_file = mmap(0, 
418                         rrd_filesize, 
419                         PROT_READ | PROT_WRITE, 
420                         MAP_SHARED, 
421                         fileno(rrd_file), 
422                         0);
423     if (rrd_mmaped_file == MAP_FAILED) {
424         rrd_set_error("error mmapping file %s", filename);
425         free(updvals);
426         free(pdp_temp);
427         free(tmpl_idx);
428         rrd_free(&rrd);
429         fclose(rrd_file);
430         return(-1);
431     }
432 #ifdef USE_MADVISE
433     /* when we use mmaping we tell the kernel the mmap equivalent
434        of POSIX_FADV_RANDOM */
435     madvise(rrd_mmaped_file,rrd_filesize,MADV_RANDOM);
436 #endif
437 #endif
438     /* loop through the arguments. */
439     for(arg_i=0; arg_i<argc;arg_i++) {
440         char *stepper = strdup(argv[arg_i]);
441         char *step_start = stepper;
442         char *p;
443         char *parsetime_error = NULL;
444         enum {atstyle, normal} timesyntax;
445         struct rrd_time_value ds_tv;
446         if (stepper == NULL){
447                 rrd_set_error("failed duplication argv entry");
448                 free(step_start);
449                 free(updvals);
450                 free(pdp_temp);  
451                 free(tmpl_idx);
452                 rrd_free(&rrd);
453 #ifdef HAVE_MMAP
454                 munmap(rrd_mmaped_file, rrd_filesize);
455 #endif
456                 fclose(rrd_file);
457                 return(-1);
458          }
459         /* initialize all ds input to unknown except the first one
460            which has always got to be set */
461         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
462         updvals[0]=stepper;
463         /* separate all ds elements; first must be examined separately
464            due to alternate time syntax */
465         if ((p=strchr(stepper,'@'))!=NULL) {
466             timesyntax = atstyle;
467             *p = '\0';
468             stepper = p+1;
469         } else if ((p=strchr(stepper,':'))!=NULL) {
470             timesyntax = normal;
471             *p = '\0';
472             stepper = p+1;
473         } else {
474             rrd_set_error("expected timestamp not found in data source from %s",
475                           argv[arg_i]);
476             free(step_start);
477             break;
478         }
479         ii=1;
480         updvals[tmpl_idx[ii]] = stepper;
481         while (*stepper) {
482             if (*stepper == ':') {
483                 *stepper = '\0';
484                 ii++;
485                 if (ii<tmpl_cnt){                   
486                     updvals[tmpl_idx[ii]] = stepper+1;
487                 }
488             }
489             stepper++;
490         }
492         if (ii != tmpl_cnt-1) {
493             rrd_set_error("expected %lu data source readings (got %lu) from %s",
494                           tmpl_cnt-1, ii, argv[arg_i]);
495             free(step_start);
496             break;
497         }
498         
499         /* get the time from the reading ... handle N */
500         if (timesyntax == atstyle) {
501             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
502                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
503                 free(step_start);
504                 break;
505             }
506             if (ds_tv.type == RELATIVE_TO_END_TIME ||
507                 ds_tv.type == RELATIVE_TO_START_TIME) {
508                 rrd_set_error("specifying time relative to the 'start' "
509                               "or 'end' makes no sense here: %s",
510                               updvals[0]);
511                 free(step_start);
512                 break;
513             }
515             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
516             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
517             
518         } else if (strcmp(updvals[0],"N")==0){
519             gettimeofday(&tmp_time, 0);
520             normalize_time(&tmp_time);
521             current_time = tmp_time.tv_sec;
522             current_time_usec = tmp_time.tv_usec;
523         } else {
524             double tmp;
525             tmp = strtod(updvals[0], 0);
526             current_time = floor(tmp);
527             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
528         }
529         /* dont do any correction for old version RRDs */
530         if(version < 3) 
531             current_time_usec = 0;
532         
533         if(current_time < rrd.live_head->last_up || 
534           (current_time == rrd.live_head->last_up && 
535            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
536             rrd_set_error("%s: illegal attempt to update using time %ld when "
537                           "last update time is %ld (minimum one second step)",
538                           filename, current_time, rrd.live_head->last_up);
539             free(step_start);
540             break;
541         }
542         
543         
544         /* seek to the beginning of the rra's */
545         if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548                 rrd_set_error("seek error in rrd");
549                 free(step_start);
550                 break;
551             }
552 #endif
553             rra_current = rra_begin;
554         }
555         rra_start = rra_begin;
557         /* when was the current pdp started */
558         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
561         /* when did the last pdp_st occur */
562         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563         occu_pdp_st = current_time - occu_pdp_age;
565         /* interval = current_time - rrd.live_head->last_up; */
566         interval    = (double)(current_time - rrd.live_head->last_up) 
567                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
569         if (occu_pdp_st > proc_pdp_st){
570             /* OK we passed the pdp_st moment*/
571             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
572                                                               * occurred before the latest
573                                                               * pdp_st moment*/
574             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
575             post_int = occu_pdp_age;                         /* how much after it */
576             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
577         } else {
578             pre_int = interval;
579             post_int = 0;
580         }
582 #ifdef DEBUG
583         printf(
584                "proc_pdp_age %lu\t"
585                "proc_pdp_st %lu\t" 
586                "occu_pfp_age %lu\t" 
587                "occu_pdp_st %lu\t"
588                "int %lf\t"
589                "pre_int %lf\t"
590                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
591                 occu_pdp_age, occu_pdp_st,
592                interval, pre_int, post_int);
593 #endif
594     
595         /* process the data sources and update the pdp_prep 
596          * area accordingly */
597         for(i=0;i<rrd.stat_head->ds_cnt;i++){
598             enum dst_en dst_idx;
599             dst_idx= dst_conv(rrd.ds_def[i].dst);
601             /* make sure we do not build diffs with old last_ds values */
602             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
603                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
604                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
605             }
607             /* NOTE: DST_CDEF should never enter this if block, because
608              * updvals[i+1][0] is initialized to 'U'; unless the caller
609              * accidently specified a value for the DST_CDEF. To handle 
610               * this case, an extra check is required. */
612             if((updvals[i+1][0] != 'U') &&
613                    (dst_idx != DST_CDEF) &&
614                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
615                double rate = DNAN;
616                /* the data source type defines how to process the data */
617                 /* pdp_new contains rate * time ... eg the bytes
618                  * transferred during the interval. Doing it this way saves
619                  * a lot of math operations */
620                 
622                 switch(dst_idx){
623                 case DST_COUNTER:
624                 case DST_DERIVE:
625             for(ii=0;updvals[i+1][ii] != '\0';ii++){
626                  if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
627                       rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
628                       break;
629                  }
630             }
631             if (rrd_test_error()){
632                    break;
633             }
634                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
635                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
636                        if(dst_idx == DST_COUNTER) {
637                           /* simple overflow catcher suggested by Andres Kroonmaa */
638                           /* this will fail terribly for non 32 or 64 bit counters ... */
639                           /* are there any others in SNMP land ? */
640                           if (pdp_new[i] < (double)0.0 ) 
641                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
642                           if (pdp_new[i] < (double)0.0 ) 
643                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
644                        }
645                        rate = pdp_new[i] / interval;
646                     }
647                    else {
648                      pdp_new[i]= DNAN;          
649                    }
650                    break;
651                 case DST_ABSOLUTE:
652                     errno = 0;
653                     pdp_new[i] = strtod(updvals[i+1],&endptr);
654                     if (errno > 0){
655                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
656                         break;
657                     };
658                     if (endptr[0] != '\0'){
659                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
660                         break;
661                     }
662                     rate = pdp_new[i] / interval;                 
663                     break;
664                 case DST_GAUGE:
665                     errno = 0;
666                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
667                     if (errno > 0){
668                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669                         break;
670                     };
671                     if (endptr[0] != '\0'){
672                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673                         break;
674                     }
675                     rate = pdp_new[i] / interval;                  
676                     break;
677                 default:
678                     rrd_set_error("rrd contains unknown DS type : '%s'",
679                                   rrd.ds_def[i].dst);
680                     break;
681                 }
682                 /* break out of this for loop if the error string is set */
683                 if (rrd_test_error()){
684                     break;
685                 }
686                /* make sure pdp_temp is neither too large or too small
687                 * if any of these occur it becomes unknown ...
688                 * sorry folks ... */
689                if ( ! isnan(rate) && 
690                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
691                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
692                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
693                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
694                   pdp_new[i] = DNAN;
695                }               
696             } else {
697                 /* no news is news all the same */
698                 pdp_new[i] = DNAN;
699             }
701             
702             /* make a copy of the command line argument for the next run */
703 #ifdef DEBUG
704             fprintf(stderr,
705                     "prep ds[%lu]\t"
706                     "last_arg '%s'\t"
707                     "this_arg '%s'\t"
708                     "pdp_new %10.2f\n",
709                     i,
710                     rrd.pdp_prep[i].last_ds,
711                     updvals[i+1], pdp_new[i]);
712 #endif
713             strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
714             rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
715         }
716         /* break out of the argument parsing loop if the error_string is set */
717         if (rrd_test_error()){
718             free(step_start);
719             break;
720         }
721         /* has a pdp_st moment occurred since the last run ? */
723         if (proc_pdp_st == occu_pdp_st){
724             /* no we have not passed a pdp_st moment. therefore update is simple */
726             for(i=0;i<rrd.stat_head->ds_cnt;i++){
727                 if(isnan(pdp_new[i])) {            
728                     /* this is not realy accurate if we use subsecond data arival time
729                        should have thought of it when going subsecond resolution ...
730                        sorry next format change we will have it! */
731                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);          
732                 } else {
733                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
734                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
735                      } else {
736                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
737                      }
738                 }
739 #ifdef DEBUG
740                 fprintf(stderr,
741                         "NO PDP  ds[%lu]\t"
742                         "value %10.2f\t"
743                         "unkn_sec %5lu\n",
744                         i,
745                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
746                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748             }   
749         } else {
750             /* an pdp_st has occurred. */
752             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
753              * occurred up to the last run.        
754             pdp_new[] contains rate*seconds from the latest run.
755             pdp_temp[] will contain the rate for cdp */
757             for(i=0;i<rrd.stat_head->ds_cnt;i++){
758                 /* update pdp_prep to the current pdp_st. */
759                 double pre_unknown = 0.0;               
760                 if(isnan(pdp_new[i]))
761                     /* a final bit of unkonwn to be added bevore calculation
762                      * we use a tempaorary variable for this so that we 
763                      * don't have to turn integer lines before using the value */                
764                     pre_unknown = pre_int;
765                 else {
766                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
767                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
768                      } else {
769                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
770                      }
771                  }
772                 
774                 /* if too much of the pdp_prep is unknown we dump it */
775                 if ( 
776                     /* removed because this does not agree with the definition
777                        a heart beat can be unknown */
778                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
779                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
780                     /* if the interval is larger thatn mrhb we get NAN */
781                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
782                     (rrd.stat_head -> pdp_step / 2.0 <
783                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
784                     pdp_temp[i] = DNAN;
785                 } else {
786                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
787                         / ((double)(occu_pdp_st - proc_pdp_st
788                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
789                             -pre_unknown);
790                 }
792                 /* process CDEF data sources; remember each CDEF DS can
793                  * only reference other DS with a lower index number */
794             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795                    rpnp_t *rpnp;
796                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
797                    /* substitue data values for OP_VARIABLE nodes */
798                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
799                    {
800                           if (rpnp[ii].op == OP_VARIABLE) {
801                                  rpnp[ii].op = OP_NUMBER;
802                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
803                           }
804                    }
805                    /* run the rpn calculator */
806                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
807                           free(rpnp);
808                           break; /* exits the data sources pdp_temp loop */
809                    }
810                 }
811         
812                 /* make pdp_prep ready for the next run */
813                 if(isnan(pdp_new[i])){
814                     /* this is not realy accurate if we use subsecond data arival time
815                        should have thought of it when going subsecond resolution ...
816                        sorry next format change we will have it! */
817                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
818                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819                 } else {
820                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
822                         pdp_new[i]/interval*post_int;
823                 }
825 #ifdef DEBUG
826                 fprintf(stderr,
827                         "PDP UPD ds[%lu]\t"
828                         "pdp_temp %10.2f\t"
829                         "new_prep %10.2f\t"
830                         "new_unkn_sec %5lu\n",
831                         i, pdp_temp[i],
832                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
833                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 #endif
835             }
837                 /* if there were errors during the last loop, bail out here */
838             if (rrd_test_error()){
839                free(step_start);
840                break;
841             }
843                 /* compute the number of elapsed pdp_st moments */
844                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
845 #ifdef DEBUG
846                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
847 #endif
848                 if (rra_step_cnt == NULL)
849                 {
850                    rra_step_cnt = (unsigned long *) 
851                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
852                 }
854             for(i = 0, rra_start = rra_begin;
855                 i < rrd.stat_head->rra_cnt;
856             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
857                 i++)
858                 {
859                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
862         if (start_pdp_offset <= elapsed_pdp_st) {
863            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
864                       rrd.rra_def[i].pdp_cnt + 1;
865             } else {
866                    rra_step_cnt[i] = 0;
867                 }
869                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
870                 {
871                    /* If this is a bulk update, we need to skip ahead in the seasonal
872                         * arrays so that they will be correct for the next observed value;
873                         * note that for the bulk update itself, no update will occur to
874                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
875                         * be set to DNAN. */
876            if (rra_step_cnt[i] > 2) 
877                    {
878                           /* skip update by resetting rra_step_cnt[i],
879                            * note that this is not data source specific; this is due
880                            * to the bulk update, not a DNAN value for the specific data
881                            * source. */
882                           rra_step_cnt[i] = 0;
883               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
884                              &last_seasonal_coef);
885                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
886                              &seasonal_coef);
887                    }
888                 
889                   /* periodically run a smoother for seasonal effects */
890                   /* Need to use first cdp parameter buffer to track
891                    * burnin (burnin requires a specific smoothing schedule).
892                    * The CDP_init_seasonal parameter is really an RRA level,
893                    * not a data source within RRA level parameter, but the rra_def
894                    * is read only for rrd_update (not flushed to disk). */
895                   iii = i*(rrd.stat_head -> ds_cnt);
896                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
897                           <= BURNIN_CYCLES)
898                   {
899                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
900                                  > rrd.rra_def[i].row_cnt - 1) {
901                            /* mark off one of the burnin cycles */
902                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
903                        schedule_smooth = 1;
904                          }  
905                   } else {
906                          /* someone has no doubt invented a trick to deal with this
907                           * wrap around, but at least this code is clear. */
908                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
909                              rrd.rra_ptr[i].cur_row)
910                          {
911                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
912                                   * mapping between PDP and CDP */
913                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
914                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
915                                  {
916 #ifdef DEBUG
917                                         fprintf(stderr,
918                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
920                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
921 #endif
922                                         schedule_smooth = 1;
923                                  }
924              } else {
925                                  /* can't rely on negative numbers because we are working with
926                                   * unsigned values */
927                                  /* Don't need modulus here. If we've wrapped more than once, only
928                                   * one smooth is executed at the end. */
929                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
930                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
931                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
932                                  {
933 #ifdef DEBUG
934                                         fprintf(stderr,
935                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
936                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
937                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
938 #endif
939                                         schedule_smooth = 1;
940                                  }
941                          }
942                   }
944               rra_current = ftell(rrd_file); 
945                 } /* if cf is DEVSEASONAL or SEASONAL */
947         if (rrd_test_error()) break;
949                     /* update CDP_PREP areas */
950                     /* loop over data soures within each RRA */
951                     for(ii = 0;
952                         ii < rrd.stat_head->ds_cnt;
953                         ii++)
954                         {
955                         
956                         /* iii indexes the CDP prep area for this data source within the RRA */
957                         iii=i*rrd.stat_head->ds_cnt+ii;
959                         if (rrd.rra_def[i].pdp_cnt > 1) {
960                           
961                            if (rra_step_cnt[i] > 0) {
962                            /* If we are in this block, as least 1 CDP value will be written to
963                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
964                                 * to be written, then the "fill in" value is the CDP_secondary_val
965                                 * entry. */
966                                   if (isnan(pdp_temp[ii]))
967                   {
968                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
969                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
970                                   } else {
971                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
972                                           * CDP data entries. No matter the CF, the value is the same because
973                                           * the average, max, min, and last of a list of identical values is
974                                           * the same, namely, the value itself. */
975                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
976                                   }
977                      
978                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
979                                       > rrd.rra_def[i].pdp_cnt*
980                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
981                                   {
982                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
983                                          /* initialize carry over */
984                                          if (current_cf == CF_AVERAGE) {
985                                                    if (isnan(pdp_temp[ii])) { 
986                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
987                                                    } else {
988                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
989                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
990                                                    }
991                                          } else {
992                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
993                                          }
994                                   } else {
995                                          rrd_value_t cum_val, cur_val; 
996                                      switch (current_cf) {
997                                                 case CF_AVERAGE:
998                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
999                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
1000                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1001                                                (cum_val + cur_val * start_pdp_offset) /
1002                                            (rrd.rra_def[i].pdp_cnt
1003                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1004                                                    /* initialize carry over value */
1005                                                    if (isnan(pdp_temp[ii])) { 
1006                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1007                                                    } else {
1008                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1009                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1010                                                    }
1011                                                    break;
1012                                                 case CF_MAXIMUM:
1013                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1014                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1015 #ifdef DEBUG
1016                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1017                                                           isnan(pdp_temp[ii])) {
1018                                                      fprintf(stderr,
1019                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1020                                                                 i,ii);
1021                                                          exit(-1);
1022                                                   }
1023 #endif
1024                                                   if (cur_val > cum_val)
1025                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1026                                                   else
1027                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1028                                                   /* initialize carry over value */
1029                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1030                                                   break;
1031                                                 case CF_MINIMUM:
1032                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1033                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1034 #ifdef DEBUG
1035                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1036                                                           isnan(pdp_temp[ii])) {
1037                                                      fprintf(stderr,
1038                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1039                                                                 i,ii);
1040                                                          exit(-1);
1041                                                   }
1042 #endif
1043                                                   if (cur_val < cum_val)
1044                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1045                                                   else
1046                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1047                                                   /* initialize carry over value */
1048                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049                                                   break;
1050                                                 case CF_LAST:
1051                                                 default:
1052                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1053                                                    /* initialize carry over value */
1054                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1055                                                 break;
1056                                          }
1057                                   } /* endif meets xff value requirement for a valid value */
1058                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1059                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1060                                   if (isnan(pdp_temp[ii]))
1061                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1062                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1063                                   else
1064                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1065                } else  /* rra_step_cnt[i]  == 0 */
1066                            {
1067 #ifdef DEBUG
1068                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1069                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1070                                          i,ii);
1071                                   } else {
1072                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1073                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1074                                   }
1075 #endif
1076                                   if (isnan(pdp_temp[ii])) {
1077                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1078                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1079                                   {
1080                                          if (current_cf == CF_AVERAGE) {
1081                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1082                                                    elapsed_pdp_st;
1083                                          } else {
1084                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1085                                          }
1086 #ifdef DEBUG
1087                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1088                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1089 #endif
1090                                   } else {
1091                                          switch (current_cf) {
1092                                          case CF_AVERAGE:
1093                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1094                                                    elapsed_pdp_st;
1095                                                 break;
1096                                          case CF_MINIMUM:
1097                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1098                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099                                                 break; 
1100                                          case CF_MAXIMUM:
1101                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1102                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1103                                                 break; 
1104                                          case CF_LAST:
1105                                          default:
1106                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1107                                                 break;
1108                                          }
1109                                   }
1110                            }
1111                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1112                            if (elapsed_pdp_st > 2)
1113                            {
1114                                    switch (current_cf) {
1115                                    case CF_AVERAGE:
1116                                    default:
1117                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1118                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1119                                           break;
1120                    case CF_SEASONAL:
1121                                    case CF_DEVSEASONAL:
1122                                           /* need to update cached seasonal values, so they are consistent
1123                                            * with the bulk update */
1124                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1125                                            * CDP_last_deviation are the same. */
1126                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1127                                                  last_seasonal_coef[ii];
1128                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1129                                                  seasonal_coef[ii];
1130                                           break;
1131                    case CF_HWPREDICT:
1132                                           /* need to update the null_count and last_null_count.
1133                                            * even do this for non-DNAN pdp_temp because the
1134                                            * algorithm is not learning from batch updates. */
1135                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1136                                                  elapsed_pdp_st;
1137                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1138                                                  elapsed_pdp_st - 1;
1139                                           /* fall through */
1140                                    case CF_DEVPREDICT:
1141                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1142                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1143                                           break;
1144                    case CF_FAILURES:
1145                                           /* do not count missed bulk values as failures */
1146                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1147                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1148                                           /* need to reset violations buffer.
1149                                            * could do this more carefully, but for now, just
1150                                            * assume a bulk update wipes away all violations. */
1151                       erase_violations(&rrd, iii, i);
1152                                           break;
1153                                    }
1154                            } 
1155                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1157                         if (rrd_test_error()) break;
1159                         } /* endif data sources loop */
1160         } /* end RRA Loop */
1162                 /* this loop is only entered if elapsed_pdp_st < 3 */
1163                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1164                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1165                 {
1166                for(i = 0, rra_start = rra_begin;
1167                    i < rrd.stat_head->rra_cnt;
1168                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1169                    i++)
1170                    {
1171                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1173                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1174                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1175                           {
1176                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1177                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1178                                 &seasonal_coef);
1179                  rra_current = ftell(rrd_file);
1180                           }
1181                           if (rrd_test_error()) break;
1182                       /* loop over data soures within each RRA */
1183                       for(ii = 0;
1184                           ii < rrd.stat_head->ds_cnt;
1185                           ii++)
1186                           {
1187                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1188                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1189                                     scratch_idx, seasonal_coef);
1190                           }
1191            } /* end RRA Loop */
1192                    if (rrd_test_error()) break;
1193             } /* end elapsed_pdp_st loop */
1195                 if (rrd_test_error()) break;
1197                 /* Ready to write to disk */
1198                 /* Move sequentially through the file, writing one RRA at a time.
1199                  * Note this architecture divorces the computation of CDP with
1200                  * flushing updated RRA entries to disk. */
1201             for(i = 0, rra_start = rra_begin;
1202                 i < rrd.stat_head->rra_cnt;
1203             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1204                 i++) {
1205                 /* is th5Aere anything to write for this RRA? If not, continue. */
1206         if (rra_step_cnt[i] == 0) continue;
1208                 /* write the first row */
1209 #ifdef DEBUG
1210         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1211 #endif
1212             rrd.rra_ptr[i].cur_row++;
1213             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1214                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1215                 /* positition on the first row */
1216                 rra_pos_tmp = rra_start +
1217                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1218                 if(rra_pos_tmp != rra_current) {
1219 #ifndef HAVE_MMAP
1220                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1221                       rrd_set_error("seek error in rrd");
1222                       break;
1223                    }
1224 #endif
1225                    rra_current = rra_pos_tmp;
1226                 }
1228 #ifdef DEBUG
1229             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1230 #endif
1231                 scratch_idx = CDP_primary_val;
1232                 if (pcdp_summary != NULL)
1233                 {
1234                    rra_time = (current_time - current_time 
1235                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1236                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1237                 }
1238 #ifdef HAVE_MMAP
1239                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1240                    pcdp_summary, &rra_time, rrd_mmaped_file);
1241 #else
1242                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1243                    pcdp_summary, &rra_time);
1244 #endif
1245                 if (rrd_test_error()) break;
1247                 /* write other rows of the bulk update, if any */
1248                 scratch_idx = CDP_secondary_val;
1249                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1250                 {
1251                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1252                    {
1253 #ifdef DEBUG
1254               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1255                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1256 #endif
1257                           /* wrap */
1258                           rrd.rra_ptr[i].cur_row = 0;
1259                           /* seek back to beginning of current rra */
1260                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1261                           {
1262                          rrd_set_error("seek error in rrd");
1263                          break;
1264                           }
1265 #ifdef DEBUG
1266                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1267 #endif
1268                           rra_current = rra_start;
1269                    }
1270                    if (pcdp_summary != NULL)
1271                    {
1272                       rra_time = (current_time - current_time 
1273                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1274                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1275                    }
1276 #ifdef HAVE_MMAP
1277                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278                       pcdp_summary, &rra_time, rrd_mmaped_file);
1279 #else
1280                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1281                       pcdp_summary, &rra_time);
1282 #endif
1283                 }
1284                 
1285                 if (rrd_test_error())
1286                   break;
1287                 } /* RRA LOOP */
1289             /* break out of the argument parsing loop if error_string is set */
1290             if (rrd_test_error()){
1291                    free(step_start);
1292                    break;
1293             } 
1294             
1295         } /* endif a pdp_st has occurred */ 
1296         rrd.live_head->last_up = current_time;
1297         rrd.live_head->last_up_usec = current_time_usec; 
1298         free(step_start);
1299     } /* function argument loop */
1301     if (seasonal_coef != NULL) free(seasonal_coef);
1302     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1303         if (rra_step_cnt != NULL) free(rra_step_cnt);
1304     rpnstack_free(&rpnstack);
1306 #ifdef HAVE_MMAP
1307     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1308             rrd_set_error("error writing(unmapping) file: %s", filename);
1309     }
1310 #endif    
1311     /* if we got here and if there is an error and if the file has not been
1312      * written to, then close things up and return. */
1313     if (rrd_test_error()) {
1314         free(updvals);
1315         free(tmpl_idx);
1316         rrd_free(&rrd);
1317         free(pdp_temp);
1318         free(pdp_new);
1319         fclose(rrd_file);
1320         return(-1);
1321     }
1323     /* aargh ... that was tough ... so many loops ... anyway, its done.
1324      * we just need to write back the live header portion now*/
1326     if (fseek(rrd_file, (sizeof(stat_head_t)
1327                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1328                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1329               SEEK_SET) != 0) {
1330         rrd_set_error("seek rrd for live header writeback");
1331         free(updvals);
1332         free(tmpl_idx);
1333         rrd_free(&rrd);
1334         free(pdp_temp);
1335         free(pdp_new);
1336         fclose(rrd_file);
1337         return(-1);
1338     }
1340     if(version >= 3) {
1341             if(fwrite( rrd.live_head,
1342                        sizeof(live_head_t), 1, rrd_file) != 1){
1343                 rrd_set_error("fwrite live_head to rrd");
1344                 free(updvals);
1345                 rrd_free(&rrd);
1346                 free(tmpl_idx);
1347                 free(pdp_temp);
1348                 free(pdp_new);
1349                 fclose(rrd_file);
1350                 return(-1);
1351             }
1352     }
1353     else {
1354             if(fwrite( &rrd.live_head->last_up,
1355                        sizeof(time_t), 1, rrd_file) != 1){
1356                 rrd_set_error("fwrite live_head to rrd");
1357                 free(updvals);
1358                 rrd_free(&rrd);
1359                 free(tmpl_idx);
1360                 free(pdp_temp);
1361                 free(pdp_new);
1362                 fclose(rrd_file);
1363                 return(-1);
1364             }
1365     }
1366             
1368     if(fwrite( rrd.pdp_prep,
1369                sizeof(pdp_prep_t),
1370                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1371         rrd_set_error("ftwrite pdp_prep to rrd");
1372         free(updvals);
1373         rrd_free(&rrd);
1374         free(tmpl_idx);
1375         free(pdp_temp);
1376         free(pdp_new);
1377         fclose(rrd_file);
1378         return(-1);
1379     }
1381     if(fwrite( rrd.cdp_prep,
1382                sizeof(cdp_prep_t),
1383                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1384        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1386         rrd_set_error("ftwrite cdp_prep to rrd");
1387         free(updvals);
1388         free(tmpl_idx);
1389         rrd_free(&rrd);
1390         free(pdp_temp);
1391         free(pdp_new);
1392         fclose(rrd_file);
1393         return(-1);
1394     }
1396     if(fwrite( rrd.rra_ptr,
1397                sizeof(rra_ptr_t), 
1398                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1399         rrd_set_error("fwrite rra_ptr to rrd");
1400         free(updvals);
1401         free(tmpl_idx);
1402         rrd_free(&rrd);
1403         free(pdp_temp);
1404         free(pdp_new);
1405         fclose(rrd_file);
1406         return(-1);
1407     }
1408     
1409     /* OK now close the files and free the memory */
1410     if(fclose(rrd_file) != 0){
1411         rrd_set_error("closing rrd");
1412         free(updvals);
1413         free(tmpl_idx);
1414         rrd_free(&rrd);
1415         free(pdp_temp);
1416         free(pdp_new);
1417         return(-1);
1418     }
1420     /* calling the smoothing code here guarantees at most
1421          * one smoothing operation per rrd_update call. Unfortunately,
1422          * it is possible with bulk updates, or a long-delayed update
1423          * for smoothing to occur off-schedule. This really isn't
1424          * critical except during the burning cycles. */
1425         if (schedule_smooth)
1426         {
1427           rrd_file = fopen(filename,"rb+");
1428           
1430           rra_start = rra_begin;
1431           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1432           {
1433             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1434                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1435             {
1436 #ifdef DEBUG
1437               fprintf(stderr,"Running smoother for rra %ld\n",i);
1438 #endif
1439               apply_smoother(&rrd,i,rra_start,rrd_file);
1440               if (rrd_test_error())
1441                 break;
1442             }
1443             rra_start += rrd.rra_def[i].row_cnt
1444               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1445           }
1446           fclose(rrd_file);
1447         }
1448     rrd_free(&rrd);
1449     free(updvals);
1450     free(tmpl_idx);
1451     free(pdp_new);
1452     free(pdp_temp);
1453     return(0);
1456 /*
1457  * get exclusive lock to whole file.
1458  * lock gets removed when we close the file
1459  *
1460  * returns 0 on success
1461  */
1462 int
1463 LockRRD(FILE *rrdfile)
1465     int rrd_fd;         /* File descriptor for RRD */
1466     int rcstat;
1468     rrd_fd = fileno(rrdfile);
1470         {
1471 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1472     struct _stat st;
1474     if ( _fstat( rrd_fd, &st ) == 0 ) {
1475             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1476     } else {
1477             rcstat = -1;
1478     }
1479 #else
1480     struct flock        lock;
1481     lock.l_type = F_WRLCK;    /* exclusive write lock */
1482     lock.l_len = 0;           /* whole file */
1483     lock.l_start = 0;         /* start of file */
1484     lock.l_whence = SEEK_SET;   /* end of file */
1486     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1487 #endif
1488         }
1490     return(rcstat);
1494 #ifdef HAVE_MMAP
1495 info_t
1496 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1497                unsigned short CDP_scratch_idx, 
1498 #ifndef DEBUG
1499 FILE UNUSED(*rrd_file),
1500 #else
1501 FILE *rrd_file,
1502 #endif
1503                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1504 #else
1505 info_t
1506 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1507                unsigned short CDP_scratch_idx, FILE *rrd_file,
1508                    info_t *pcdp_summary, time_t *rra_time)
1509 #endif
1511    unsigned long ds_idx, cdp_idx;
1512    infoval iv;
1513   
1514    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1515    {
1516       /* compute the cdp index */
1517       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1518 #ifdef DEBUG
1519           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1520              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1521              rrd -> rra_def[rra_idx].cf_nam);
1522 #endif 
1523       if (pcdp_summary != NULL)
1524           {
1525              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1526              /* append info to the return hash */
1527                  pcdp_summary = info_push(pcdp_summary,
1528                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1529                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1530                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1531          RD_I_VAL, iv);
1532           }
1533 #ifdef HAVE_MMAP
1534           memcpy((char *)rrd_mmaped_file + *rra_current,
1535                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1536                           sizeof(rrd_value_t));
1537 #else
1538           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1539                  sizeof(rrd_value_t),1,rrd_file) != 1)
1540           { 
1541              rrd_set_error("writing rrd");
1542              return 0;
1543           }
1544 #endif
1545           *rra_current += sizeof(rrd_value_t);
1546         }
1547         return (pcdp_summary);