Code

added heartbeat grahics
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.2.12  Copyright by Tobi Oetiker, 1997-2005
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13  #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17  #include <sys/locking.h>
18  #include <sys/stat.h>
19  #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
31  * replacement.
32  */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37         time_t tv_sec; /* seconds */
38         long tv_usec;  /* microseconds */
39 };
40 #endif
42 struct __timezone {
43         int  tz_minuteswest; /* minutes W of Greenwich */
44         int  tz_dsttime;     /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49         struct _timeb current_time;
51         _ftime(&current_time);
53         t->tv_sec  = current_time.time;
54         t->tv_usec = current_time.millitm * 1000;
56         return 0;
57 }
59 #endif
60 /*
61  * normilize time as returned by gettimeofday. usec part must
62  * be always >= 0
63  */
64 static void normalize_time(struct timeval *t)
65 {
66         if(t->tv_usec < 0) {
67                 t->tv_sec--;
68                 t->tv_usec += 1000000L;
69         }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
76                                         unsigned long *rra_current,
77                                         unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83                                         info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
86                                         unsigned long *rra_current,
87                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
88                                         info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
92                                         info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 #ifdef STANDALONE
98 int 
99 main(int argc, char **argv){
100         rrd_update(argc,argv);
101         if (rrd_test_error()) {
102                 printf("RRDtool " PACKAGE_VERSION "  Copyright by Tobi Oetiker, 1997-2005\n\n"
103                         "Usage: rrdupdate filename\n"
104                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
105                         "\t\t\ttime|N:value[:value...]\n\n"
106                         "\t\t\tat-time@value[:value...]\n\n"
107                         "\t\t\t[ time:value[:value...] ..]\n\n");
108                                    
109                 printf("ERROR: %s\n",rrd_get_error());
110                 rrd_clear_error();                                                            
111                 return 1;
112         }
113         return 0;
115 #endif
117 info_t *rrd_update_v(int argc, char **argv)
119     char             *tmplt = NULL;          
120         info_t *result = NULL;
121         infoval rc;
122     optind = 0; opterr = 0;  /* initialize getopt */
124     while (1) {
125                 static struct option long_options[] =
126                         {
127                                 {"template",      required_argument, 0, 't'},
128                                 {0,0,0,0}
129                         };
130                 int option_index = 0;
131                 int opt;
132                 opt = getopt_long(argc, argv, "t:", 
133                                                   long_options, &option_index);
134                 
135                 if (opt == EOF)
136                         break;
137                 
138                 switch(opt) {
139                 case 't':
140                         tmplt = optarg;
141                         break;
142                 
143                 case '?':
144                         rrd_set_error("unknown option '%s'",argv[optind-1]);
145             rc.u_int = -1;
146                         goto end_tag;
147                 }
148     }
150     /* need at least 2 arguments: filename, data. */
151     if (argc-optind < 2) {
152                 rrd_set_error("Not enough arguments");
153         rc.u_int = -1;
154                 goto end_tag;
155     }
156     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
157         rc.u_int = _rrd_update(argv[optind], tmplt,
158                       argc - optind - 1, argv + optind + 1, result);
159     result->value.u_int = rc.u_int;
160 end_tag:
161     return result;
164 int
165 rrd_update(int argc, char **argv)
167     char             *tmplt = NULL;          
168     int rc;
169     optind = 0; opterr = 0;  /* initialize getopt */
171     while (1) {
172                 static struct option long_options[] =
173                         {
174                                 {"template",      required_argument, 0, 't'},
175                                 {0,0,0,0}
176                         };
177                 int option_index = 0;
178                 int opt;
179                 opt = getopt_long(argc, argv, "t:", 
180                                                   long_options, &option_index);
181                 
182                 if (opt == EOF)
183                         break;
184                 
185                 switch(opt) {
186                 case 't':
187                         tmplt = optarg;
188                         break;
189                 
190                 case '?':
191                         rrd_set_error("unknown option '%s'",argv[optind-1]);
192                         return(-1);
193                 }
194     }
196     /* need at least 2 arguments: filename, data. */
197     if (argc-optind < 2) {
198                 rrd_set_error("Not enough arguments");
200                 return -1;
201     }
202  
203         rc = rrd_update_r(argv[optind], tmplt,
204                       argc - optind - 1, argv + optind + 1);
205     return rc;
208 int
209 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
211    return _rrd_update(filename, tmplt, argc, argv, NULL);
214 int
215 _rrd_update(char *filename, char *tmplt, int argc, char **argv, 
216    info_t *pcdp_summary)
219     int              arg_i = 2;
220     short            j;
221     unsigned long    i,ii,iii=1;
223     unsigned long    rra_begin;          /* byte pointer to the rra
224                                           * area in the rrd file.  this
225                                           * pointer never changes value */
226     unsigned long    rra_start;          /* byte pointer to the rra
227                                           * area in the rrd file.  this
228                                           * pointer changes as each rrd is
229                                           * processed. */
230     unsigned long    rra_current;        /* byte pointer to the current write
231                                           * spot in the rrd file. */
232     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
233     double           interval,
234         pre_int,post_int;                /* interval between this and
235                                           * the last run */
236     unsigned long    proc_pdp_st;        /* which pdp_st was the last
237                                           * to be processed */
238     unsigned long    occu_pdp_st;        /* when was the pdp_st
239                                           * before the last update
240                                           * time */
241     unsigned long    proc_pdp_age;       /* how old was the data in
242                                           * the pdp prep area when it
243                                           * was last updated */
244     unsigned long    occu_pdp_age;       /* how long ago was the last
245                                           * pdp_step time */
246     rrd_value_t      *pdp_new;           /* prepare the incoming data
247                                           * to be added the the
248                                           * existing entry */
249     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
250                                           * to be added the the
251                                           * cdp values */
253     long             *tmpl_idx;          /* index representing the settings
254                                             transported by the tmplt index */
255     unsigned long    tmpl_cnt = 2;       /* time and data */
257     FILE             *rrd_file;
258     rrd_t            rrd;
259     time_t           current_time = 0;
260     time_t           rra_time = 0;       /* time of update for a RRA */
261     unsigned long    current_time_usec=0;/* microseconds part of current time */
262     struct timeval   tmp_time;           /* used for time conversion */
264     char             **updvals;
265     int              schedule_smooth = 0;
266         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
267                                          /* a vector of future Holt-Winters seasonal coefs */
268     unsigned long    elapsed_pdp_st;
269                                          /* number of elapsed PDP steps since last update */
270     unsigned long    *rra_step_cnt = NULL;
271                                          /* number of rows to be updated in an RRA for a data
272                                           * value. */
273     unsigned long    start_pdp_offset;
274                                          /* number of PDP steps since the last update that
275                                           * are assigned to the first CDP to be generated
276                                           * since the last update. */
277     unsigned short   scratch_idx;
278                                          /* index into the CDP scratch array */
279     enum cf_en       current_cf;
280                                          /* numeric id of the current consolidation function */
281     rpnstack_t       rpnstack; /* used for COMPUTE DS */
282     int              version;  /* rrd version */
283     char             *endptr; /* used in the conversion */
284 #ifdef HAVE_MMAP
285     void             *rrd_mmaped_file;
286     unsigned long    rrd_filesize;
287 #endif
289     rpnstack_init(&rpnstack);
291     /* need at least 1 arguments: data. */
292     if (argc < 1) {
293         rrd_set_error("Not enough arguments");
294         return -1;
295     }
296     
297     
299     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
300         return -1;
301     }
302     /* initialize time */
303     version = atoi(rrd.stat_head->version);
304     gettimeofday(&tmp_time, 0);
305     normalize_time(&tmp_time);
306     current_time = tmp_time.tv_sec;
307     if(version >= 3) {
308         current_time_usec = tmp_time.tv_usec;
309     }
310     else {
311         current_time_usec = 0;
312     }
314     rra_current = rra_start = rra_begin = ftell(rrd_file);
315     /* This is defined in the ANSI C standard, section 7.9.5.3:
317         When a file is opened with udpate mode ('+' as the second
318         or third character in the ... list of mode argument
319         variables), both input and ouptut may be performed on the
320         associated stream.  However, ...  input may not be directly
321         followed by output without an intervening call to a file
322         positioning function, unless the input oepration encounters
323         end-of-file. */
324 #ifdef HAVE_MMAP
325     fseek(rrd_file, 0, SEEK_END);
326     rrd_filesize = ftell(rrd_file);
327     fseek(rrd_file, rra_current, SEEK_SET);
328 #else
329     fseek(rrd_file, 0, SEEK_CUR);
330 #endif
332     
333     /* get exclusive lock to whole file.
334      * lock gets removed when we close the file.
335      */
336     if (LockRRD(rrd_file) != 0) {
337       rrd_set_error("could not lock RRD");
338       rrd_free(&rrd);
339       fclose(rrd_file);
340       return(-1);   
341     } 
343     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
344         rrd_set_error("allocating updvals pointer array");
345         rrd_free(&rrd);
346         fclose(rrd_file);
347         return(-1);
348     }
350     if ((pdp_temp = malloc(sizeof(rrd_value_t)
351                            *rrd.stat_head->ds_cnt))==NULL){
352         rrd_set_error("allocating pdp_temp ...");
353         free(updvals);
354         rrd_free(&rrd);
355         fclose(rrd_file);
356         return(-1);
357     }
359     if ((tmpl_idx = malloc(sizeof(unsigned long)
360                            *(rrd.stat_head->ds_cnt+1)))==NULL){
361         rrd_set_error("allocating tmpl_idx ...");
362         free(pdp_temp);
363         free(updvals);
364         rrd_free(&rrd);
365         fclose(rrd_file);
366         return(-1);
367     }
368     /* initialize tmplt redirector */
369     /* default config example (assume DS 1 is a CDEF DS)
370        tmpl_idx[0] -> 0; (time)
371        tmpl_idx[1] -> 1; (DS 0)
372        tmpl_idx[2] -> 3; (DS 2)
373        tmpl_idx[3] -> 4; (DS 3) */
374     tmpl_idx[0] = 0; /* time */
375     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
376         {
377            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
378               tmpl_idx[ii++]=i;
379         }
380     tmpl_cnt= ii;
382     if (tmplt) {
383         /* we should work on a writeable copy here */
384         char *dsname;
385         unsigned int tmpl_len;
386         tmplt = strdup(tmplt);
387         dsname = tmplt;
388         tmpl_cnt = 1; /* the first entry is the time */
389         tmpl_len = strlen(tmplt);
390         for(i=0;i<=tmpl_len ;i++) {
391             if (tmplt[i] == ':' || tmplt[i] == '\0') {
392                 tmplt[i] = '\0';
393                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
394                     rrd_set_error("tmplt contains more DS definitions than RRD");
395                     free(updvals); free(pdp_temp);
396                     free(tmpl_idx); rrd_free(&rrd);
397                     fclose(rrd_file); return(-1);
398                 }
399                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
400                     rrd_set_error("unknown DS name '%s'",dsname);
401                     free(updvals); free(pdp_temp);
402                     free(tmplt);
403                     free(tmpl_idx); rrd_free(&rrd);
404                     fclose(rrd_file); return(-1);
405                 } else {
406                   /* the first element is always the time */
407                   tmpl_idx[tmpl_cnt-1]++; 
408                   /* go to the next entry on the tmplt */
409                   dsname = &tmplt[i+1];
410                   /* fix the damage we did before */
411                   if (i<tmpl_len) {
412                      tmplt[i]=':';
413                   } 
415                 }
416             }       
417         }
418         free(tmplt);
419     }
420     if ((pdp_new = malloc(sizeof(rrd_value_t)
421                           *rrd.stat_head->ds_cnt))==NULL){
422         rrd_set_error("allocating pdp_new ...");
423         free(updvals);
424         free(pdp_temp);
425         free(tmpl_idx);
426         rrd_free(&rrd);
427         fclose(rrd_file);
428         return(-1);
429     }
431 #ifdef HAVE_MMAP
432     rrd_mmaped_file = mmap(0, 
433                         rrd_filesize, 
434                         PROT_READ | PROT_WRITE, 
435                         MAP_SHARED, 
436                         fileno(rrd_file), 
437                         0);
438     if (rrd_mmaped_file == MAP_FAILED) {
439         rrd_set_error("error mmapping file %s", filename);
440         free(updvals);
441         free(pdp_temp);
442         free(tmpl_idx);
443         rrd_free(&rrd);
444         fclose(rrd_file);
445         return(-1);
446     }
447 #endif
448     /* loop through the arguments. */
449     for(arg_i=0; arg_i<argc;arg_i++) {
450         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
451         char *step_start = stepper;
452         char *p;
453         char *parsetime_error = NULL;
454         enum {atstyle, normal} timesyntax;
455         struct rrd_time_value ds_tv;
456         if (stepper == NULL){
457                 rrd_set_error("failed duplication argv entry");
458                 free(updvals);
459                 free(pdp_temp);  
460                 free(tmpl_idx);
461                 rrd_free(&rrd);
462 #ifdef HAVE_MMAP
463                 munmap(rrd_mmaped_file, rrd_filesize);
464 #endif
465                 fclose(rrd_file);
466                 return(-1);
467          }
468         /* initialize all ds input to unknown except the first one
469            which has always got to be set */
470         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
471         strcpy(stepper,argv[arg_i]);
472         updvals[0]=stepper;
473         /* separate all ds elements; first must be examined separately
474            due to alternate time syntax */
475         if ((p=strchr(stepper,'@'))!=NULL) {
476             timesyntax = atstyle;
477             *p = '\0';
478             stepper = p+1;
479         } else if ((p=strchr(stepper,':'))!=NULL) {
480             timesyntax = normal;
481             *p = '\0';
482             stepper = p+1;
483         } else {
484             rrd_set_error("expected timestamp not found in data source from %s:...",
485                           argv[arg_i]);
486             free(step_start);
487             break;
488         }
489         ii=1;
490         updvals[tmpl_idx[ii]] = stepper;
491         while (*stepper) {
492             if (*stepper == ':') {
493                 *stepper = '\0';
494                 ii++;
495                 if (ii<tmpl_cnt){                   
496                     updvals[tmpl_idx[ii]] = stepper+1;
497                 }
498             }
499             stepper++;
500         }
502         if (ii != tmpl_cnt-1) {
503             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
504                           tmpl_cnt-1, ii, argv[arg_i]);
505             free(step_start);
506             break;
507         }
508         
509         /* get the time from the reading ... handle N */
510         if (timesyntax == atstyle) {
511             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
512                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
513                 free(step_start);
514                 break;
515             }
516             if (ds_tv.type == RELATIVE_TO_END_TIME ||
517                 ds_tv.type == RELATIVE_TO_START_TIME) {
518                 rrd_set_error("specifying time relative to the 'start' "
519                               "or 'end' makes no sense here: %s",
520                               updvals[0]);
521                 free(step_start);
522                 break;
523             }
525             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
526             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
527             
528         } else if (strcmp(updvals[0],"N")==0){
529             gettimeofday(&tmp_time, 0);
530             normalize_time(&tmp_time);
531             current_time = tmp_time.tv_sec;
532             current_time_usec = tmp_time.tv_usec;
533         } else {
534             double tmp;
535             tmp = strtod(updvals[0], 0);
536             current_time = floor(tmp);
537             current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
538         }
539         /* dont do any correction for old version RRDs */
540         if(version < 3) 
541             current_time_usec = 0;
542         
543         if(current_time < rrd.live_head->last_up || 
544           (current_time == rrd.live_head->last_up && 
545            (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
546             rrd_set_error("illegal attempt to update using time %ld when "
547                           "last update time is %ld (minimum one second step)",
548                           current_time, rrd.live_head->last_up);
549             free(step_start);
550             break;
551         }
552         
553         
554         /* seek to the beginning of the rra's */
555         if (rra_current != rra_begin) {
556 #ifndef HAVE_MMAP
557             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
558                 rrd_set_error("seek error in rrd");
559                 free(step_start);
560                 break;
561             }
562 #endif
563             rra_current = rra_begin;
564         }
565         rra_start = rra_begin;
567         /* when was the current pdp started */
568         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
569         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
571         /* when did the last pdp_st occur */
572         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
573         occu_pdp_st = current_time - occu_pdp_age;
575         /* interval = current_time - rrd.live_head->last_up; */
576         interval    = (double)(current_time - rrd.live_head->last_up) 
577                     + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
579         if (occu_pdp_st > proc_pdp_st){
580             /* OK we passed the pdp_st moment*/
581             pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
582                                                               * occurred before the latest
583                                                               * pdp_st moment*/
584             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
585             post_int = occu_pdp_age;                         /* how much after it */
586             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
587         } else {
588             pre_int = interval;
589             post_int = 0;
590         }
592 #ifdef DEBUG
593         printf(
594                "proc_pdp_age %lu\t"
595                "proc_pdp_st %lu\t" 
596                "occu_pfp_age %lu\t" 
597                "occu_pdp_st %lu\t"
598                "int %lf\t"
599                "pre_int %lf\t"
600                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
601                 occu_pdp_age, occu_pdp_st,
602                interval, pre_int, post_int);
603 #endif
604     
605         /* process the data sources and update the pdp_prep 
606          * area accordingly */
607         for(i=0;i<rrd.stat_head->ds_cnt;i++){
608             enum dst_en dst_idx;
609             dst_idx= dst_conv(rrd.ds_def[i].dst);
611             /* make sure we do not build diffs with old last_ds values */
612             if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval 
613                 && ( dst_idx == DST_COUNTER || dst_idx == DST_DERIVE)){
614                 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
615             }
617             /* NOTE: DST_CDEF should never enter this if block, because
618              * updvals[i+1][0] is initialized to 'U'; unless the caller
619              * accidently specified a value for the DST_CDEF. To handle 
620               * this case, an extra check is required. */
622             if((updvals[i+1][0] != 'U') &&
623                    (dst_idx != DST_CDEF) &&
624                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
625                double rate = DNAN;
626                /* the data source type defines how to process the data */
627                 /* pdp_new contains rate * time ... eg the bytes
628                  * transferred during the interval. Doing it this way saves
629                  * a lot of math operations */
630                 
632                 switch(dst_idx){
633                 case DST_COUNTER:
634                 case DST_DERIVE:
635                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
636                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
637                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
638                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
639                                  break;
640                             }
641                        }
642                        if (rrd_test_error()){
643                             break;
644                        }
645                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
646                        if(dst_idx == DST_COUNTER) {
647                           /* simple overflow catcher suggested by Andres Kroonmaa */
648                           /* this will fail terribly for non 32 or 64 bit counters ... */
649                           /* are there any others in SNMP land ? */
650                           if (pdp_new[i] < (double)0.0 ) 
651                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
652                           if (pdp_new[i] < (double)0.0 ) 
653                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
654                        }
655                        rate = pdp_new[i] / interval;
656                     }
657                    else {
658                      pdp_new[i]= DNAN;          
659                    }
660                    break;
661                 case DST_ABSOLUTE:
662                     errno = 0;
663                     pdp_new[i] = strtod(updvals[i+1],&endptr);
664                     if (errno > 0){
665                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
666                         break;
667                     };
668                     if (endptr[0] != '\0'){
669                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
670                         break;
671                     }
672                     rate = pdp_new[i] / interval;                 
673                     break;
674                 case DST_GAUGE:
675                     errno = 0;
676                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
677                     if (errno > 0){
678                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
679                         break;
680                     };
681                     if (endptr[0] != '\0'){
682                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
683                         break;
684                     }
685                     rate = pdp_new[i] / interval;                  
686                     break;
687                 default:
688                     rrd_set_error("rrd contains unknown DS type : '%s'",
689                                   rrd.ds_def[i].dst);
690                     break;
691                 }
692                 /* break out of this for loop if the error string is set */
693                 if (rrd_test_error()){
694                     break;
695                 }
696                /* make sure pdp_temp is neither too large or too small
697                 * if any of these occur it becomes unknown ...
698                 * sorry folks ... */
699                if ( ! isnan(rate) && 
700                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
701                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
702                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
703                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
704                   pdp_new[i] = DNAN;
705                }               
706             } else {
707                 /* no news is news all the same */
708                 pdp_new[i] = DNAN;
709             }
710             
711             /* make a copy of the command line argument for the next run */
712 #ifdef DEBUG
713             fprintf(stderr,
714                     "prep ds[%lu]\t"
715                     "last_arg '%s'\t"
716                     "this_arg '%s'\t"
717                     "pdp_new %10.2f\n",
718                     i,
719                     rrd.pdp_prep[i].last_ds,
720                     updvals[i+1], pdp_new[i]);
721 #endif
722             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
723                 strncpy(rrd.pdp_prep[i].last_ds,
724                         updvals[i+1],LAST_DS_LEN-1);
725                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
726             }
727         }
728         /* break out of the argument parsing loop if the error_string is set */
729         if (rrd_test_error()){
730             free(step_start);
731             break;
732         }
733         /* has a pdp_st moment occurred since the last run ? */
735         if (proc_pdp_st == occu_pdp_st){
736             /* no we have not passed a pdp_st moment. therefore update is simple */
738             for(i=0;i<rrd.stat_head->ds_cnt;i++){
739                 if(isnan(pdp_new[i]))
740                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval+0.5);
741                 else {
742                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
743                         rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
744                      } else {
745                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
746                      }
747                 }
748 #ifdef DEBUG
749                 fprintf(stderr,
750                         "NO PDP  ds[%lu]\t"
751                         "value %10.2f\t"
752                         "unkn_sec %5lu\n",
753                         i,
754                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
755                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
756 #endif
757             }   
758         } else {
759             /* an pdp_st has occurred. */
761             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
762              * occurred up to the last run.        
763             pdp_new[] contains rate*seconds from the latest run.
764             pdp_temp[] will contain the rate for cdp */
766             for(i=0;i<rrd.stat_head->ds_cnt;i++){
767                 /* update pdp_prep to the current pdp_st. */
768                 
769                 if(isnan(pdp_new[i]))
770                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(pre_int+0.5);
771                 else {
772                      if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
773                         rrd.pdp_prep[i].scratch[PDP_val].u_val=         pdp_new[i]/interval*pre_int;
774                      } else {
775                         rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
776                      }
777                  }
778                 
780                 /* if too much of the pdp_prep is unknown we dump it */
781                 if ( 
782                     /* removed because this does not agree with the definition
783                        a heart beat can be unknown */
784                     /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
785                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
786                     /* if the interval is larger thatn mrhb we get NAN */
787                     (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
788                     (occu_pdp_st-proc_pdp_st <= 
789                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
790                     pdp_temp[i] = DNAN;
791                 } else {
792                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
793                         / (double)( occu_pdp_st
794                                     - proc_pdp_st
795                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
796                 }
798                 /* process CDEF data sources; remember each CDEF DS can
799                  * only reference other DS with a lower index number */
800             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
801                    rpnp_t *rpnp;
802                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
803                    /* substitue data values for OP_VARIABLE nodes */
804                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
805                    {
806                           if (rpnp[ii].op == OP_VARIABLE) {
807                                  rpnp[ii].op = OP_NUMBER;
808                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
809                           }
810                    }
811                    /* run the rpn calculator */
812                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
813                           free(rpnp);
814                           break; /* exits the data sources pdp_temp loop */
815                    }
816                 }
817         
818                 /* make pdp_prep ready for the next run */
819                 if(isnan(pdp_new[i])){
820                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int + 0.5);
821                     rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
822                 } else {
823                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
824                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
825                         pdp_new[i]/interval*post_int;
826                 }
828 #ifdef DEBUG
829                 fprintf(stderr,
830                         "PDP UPD ds[%lu]\t"
831                         "pdp_temp %10.2f\t"
832                         "new_prep %10.2f\t"
833                         "new_unkn_sec %5lu\n",
834                         i, pdp_temp[i],
835                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
836                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
837 #endif
838             }
840                 /* if there were errors during the last loop, bail out here */
841             if (rrd_test_error()){
842                free(step_start);
843                break;
844             }
846                 /* compute the number of elapsed pdp_st moments */
847                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
848 #ifdef DEBUG
849                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
850 #endif
851                 if (rra_step_cnt == NULL)
852                 {
853                    rra_step_cnt = (unsigned long *) 
854                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
855                 }
857             for(i = 0, rra_start = rra_begin;
858                 i < rrd.stat_head->rra_cnt;
859             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
860                 i++)
861                 {
862                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
863                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
864                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
865         if (start_pdp_offset <= elapsed_pdp_st) {
866            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
867                       rrd.rra_def[i].pdp_cnt + 1;
868             } else {
869                    rra_step_cnt[i] = 0;
870                 }
872                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
873                 {
874                    /* If this is a bulk update, we need to skip ahead in the seasonal
875                         * arrays so that they will be correct for the next observed value;
876                         * note that for the bulk update itself, no update will occur to
877                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
878                         * be set to DNAN. */
879            if (rra_step_cnt[i] > 2) 
880                    {
881                           /* skip update by resetting rra_step_cnt[i],
882                            * note that this is not data source specific; this is due
883                            * to the bulk update, not a DNAN value for the specific data
884                            * source. */
885                           rra_step_cnt[i] = 0;
886               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
887                              &last_seasonal_coef);
888                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
889                              &seasonal_coef);
890                    }
891                 
892                   /* periodically run a smoother for seasonal effects */
893                   /* Need to use first cdp parameter buffer to track
894                    * burnin (burnin requires a specific smoothing schedule).
895                    * The CDP_init_seasonal parameter is really an RRA level,
896                    * not a data source within RRA level parameter, but the rra_def
897                    * is read only for rrd_update (not flushed to disk). */
898                   iii = i*(rrd.stat_head -> ds_cnt);
899                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
900                           <= BURNIN_CYCLES)
901                   {
902                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
903                                  > rrd.rra_def[i].row_cnt - 1) {
904                            /* mark off one of the burnin cycles */
905                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
906                        schedule_smooth = 1;
907                          }  
908                   } else {
909                          /* someone has no doubt invented a trick to deal with this
910                           * wrap around, but at least this code is clear. */
911                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
912                              rrd.rra_ptr[i].cur_row)
913                          {
914                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
915                                   * mapping between PDP and CDP */
916                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
917                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
918                                  {
919 #ifdef DEBUG
920                                         fprintf(stderr,
921                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
922                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
923                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
924 #endif
925                                         schedule_smooth = 1;
926                                  }
927              } else {
928                                  /* can't rely on negative numbers because we are working with
929                                   * unsigned values */
930                                  /* Don't need modulus here. If we've wrapped more than once, only
931                                   * one smooth is executed at the end. */
932                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
933                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
934                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
935                                  {
936 #ifdef DEBUG
937                                         fprintf(stderr,
938                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
939                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
940                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
941 #endif
942                                         schedule_smooth = 1;
943                                  }
944                          }
945                   }
947               rra_current = ftell(rrd_file); 
948                 } /* if cf is DEVSEASONAL or SEASONAL */
950         if (rrd_test_error()) break;
952                     /* update CDP_PREP areas */
953                     /* loop over data soures within each RRA */
954                     for(ii = 0;
955                         ii < rrd.stat_head->ds_cnt;
956                         ii++)
957                         {
958                         
959                         /* iii indexes the CDP prep area for this data source within the RRA */
960                         iii=i*rrd.stat_head->ds_cnt+ii;
962                         if (rrd.rra_def[i].pdp_cnt > 1) {
963                           
964                            if (rra_step_cnt[i] > 0) {
965                            /* If we are in this block, as least 1 CDP value will be written to
966                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
967                                 * to be written, then the "fill in" value is the CDP_secondary_val
968                                 * entry. */
969                                   if (isnan(pdp_temp[ii]))
970                   {
971                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
972                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
973                                   } else {
974                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
975                                           * CDP data entries. No matter the CF, the value is the same because
976                                           * the average, max, min, and last of a list of identical values is
977                                           * the same, namely, the value itself. */
978                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
979                                   }
980                      
981                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
982                                       > rrd.rra_def[i].pdp_cnt*
983                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
984                                   {
985                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
986                                          /* initialize carry over */
987                                          if (current_cf == CF_AVERAGE) {
988                                                    if (isnan(pdp_temp[ii])) { 
989                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
990                                                    } else {
991                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
992                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
993                                                    }
994                                          } else {
995                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
996                                          }
997                                   } else {
998                                          rrd_value_t cum_val, cur_val; 
999                                      switch (current_cf) {
1000                                                 case CF_AVERAGE:
1001                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
1002                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
1003                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1004                                                (cum_val + cur_val * start_pdp_offset) /
1005                                            (rrd.rra_def[i].pdp_cnt
1006                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1007                                                    /* initialize carry over value */
1008                                                    if (isnan(pdp_temp[ii])) { 
1009                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1010                                                    } else {
1011                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1012                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1013                                                    }
1014                                                    break;
1015                                                 case CF_MAXIMUM:
1016                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1017                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1018 #ifdef DEBUG
1019                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1020                                                           isnan(pdp_temp[ii])) {
1021                                                      fprintf(stderr,
1022                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1023                                                                 i,ii);
1024                                                          exit(-1);
1025                                                   }
1026 #endif
1027                                                   if (cur_val > cum_val)
1028                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1029                                                   else
1030                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1031                                                   /* initialize carry over value */
1032                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1033                                                   break;
1034                                                 case CF_MINIMUM:
1035                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1036                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1037 #ifdef DEBUG
1038                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1039                                                           isnan(pdp_temp[ii])) {
1040                                                      fprintf(stderr,
1041                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1042                                                                 i,ii);
1043                                                          exit(-1);
1044                                                   }
1045 #endif
1046                                                   if (cur_val < cum_val)
1047                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1048                                                   else
1049                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1050                                                   /* initialize carry over value */
1051                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1052                                                   break;
1053                                                 case CF_LAST:
1054                                                 default:
1055                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1056                                                    /* initialize carry over value */
1057                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1058                                                 break;
1059                                          }
1060                                   } /* endif meets xff value requirement for a valid value */
1061                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1062                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1063                                   if (isnan(pdp_temp[ii]))
1064                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1065                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1066                                   else
1067                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1068                } else  /* rra_step_cnt[i]  == 0 */
1069                            {
1070 #ifdef DEBUG
1071                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1072                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1073                                          i,ii);
1074                                   } else {
1075                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1076                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1077                                   }
1078 #endif
1079                                   if (isnan(pdp_temp[ii])) {
1080                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1081                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1082                                   {
1083                                          if (current_cf == CF_AVERAGE) {
1084                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1085                                                    elapsed_pdp_st;
1086                                          } else {
1087                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1088                                          }
1089 #ifdef DEBUG
1090                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1091                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1092 #endif
1093                                   } else {
1094                                          switch (current_cf) {
1095                                          case CF_AVERAGE:
1096                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1097                                                    elapsed_pdp_st;
1098                                                 break;
1099                                          case CF_MINIMUM:
1100                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1101                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1102                                                 break; 
1103                                          case CF_MAXIMUM:
1104                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1105                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1106                                                 break; 
1107                                          case CF_LAST:
1108                                          default:
1109                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1110                                                 break;
1111                                          }
1112                                   }
1113                            }
1114                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1115                            if (elapsed_pdp_st > 2)
1116                            {
1117                                    switch (current_cf) {
1118                                    case CF_AVERAGE:
1119                                    default:
1120                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1121                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1122                                           break;
1123                    case CF_SEASONAL:
1124                                    case CF_DEVSEASONAL:
1125                                           /* need to update cached seasonal values, so they are consistent
1126                                            * with the bulk update */
1127                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1128                                            * CDP_last_deviation are the same. */
1129                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1130                                                  last_seasonal_coef[ii];
1131                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1132                                                  seasonal_coef[ii];
1133                                           break;
1134                    case CF_HWPREDICT:
1135                                           /* need to update the null_count and last_null_count.
1136                                            * even do this for non-DNAN pdp_temp because the
1137                                            * algorithm is not learning from batch updates. */
1138                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1139                                                  elapsed_pdp_st;
1140                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1141                                                  elapsed_pdp_st - 1;
1142                                           /* fall through */
1143                                    case CF_DEVPREDICT:
1144                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1145                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1146                                           break;
1147                    case CF_FAILURES:
1148                                           /* do not count missed bulk values as failures */
1149                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1150                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1151                                           /* need to reset violations buffer.
1152                                            * could do this more carefully, but for now, just
1153                                            * assume a bulk update wipes away all violations. */
1154                       erase_violations(&rrd, iii, i);
1155                                           break;
1156                                    }
1157                            } 
1158                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1160                         if (rrd_test_error()) break;
1162                         } /* endif data sources loop */
1163         } /* end RRA Loop */
1165                 /* this loop is only entered if elapsed_pdp_st < 3 */
1166                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1167                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1168                 {
1169                for(i = 0, rra_start = rra_begin;
1170                    i < rrd.stat_head->rra_cnt;
1171                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1172                    i++)
1173                    {
1174                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1176                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1177                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1178                           {
1179                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1180                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1181                                 &seasonal_coef);
1182                  rra_current = ftell(rrd_file);
1183                           }
1184                           if (rrd_test_error()) break;
1185                       /* loop over data soures within each RRA */
1186                       for(ii = 0;
1187                           ii < rrd.stat_head->ds_cnt;
1188                           ii++)
1189                           {
1190                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1191                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1192                                     scratch_idx, seasonal_coef);
1193                           }
1194            } /* end RRA Loop */
1195                    if (rrd_test_error()) break;
1196             } /* end elapsed_pdp_st loop */
1198                 if (rrd_test_error()) break;
1200                 /* Ready to write to disk */
1201                 /* Move sequentially through the file, writing one RRA at a time.
1202                  * Note this architecture divorces the computation of CDP with
1203                  * flushing updated RRA entries to disk. */
1204             for(i = 0, rra_start = rra_begin;
1205                 i < rrd.stat_head->rra_cnt;
1206             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1207                 i++) {
1208                 /* is there anything to write for this RRA? If not, continue. */
1209         if (rra_step_cnt[i] == 0) continue;
1211                 /* write the first row */
1212 #ifdef DEBUG
1213         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1214 #endif
1215             rrd.rra_ptr[i].cur_row++;
1216             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1217                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1218                 /* positition on the first row */
1219                 rra_pos_tmp = rra_start +
1220                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1221                 if(rra_pos_tmp != rra_current) {
1222 #ifndef HAVE_MMAP
1223                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1224                       rrd_set_error("seek error in rrd");
1225                       break;
1226                    }
1227 #endif
1228                    rra_current = rra_pos_tmp;
1229                 }
1231 #ifdef DEBUG
1232             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1233 #endif
1234                 scratch_idx = CDP_primary_val;
1235                 if (pcdp_summary != NULL)
1236                 {
1237                    rra_time = (current_time - current_time 
1238                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1239                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1240                 }
1241 #ifdef HAVE_MMAP
1242                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1243                    pcdp_summary, &rra_time, rrd_mmaped_file);
1244 #else
1245                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1246                    pcdp_summary, &rra_time);
1247 #endif
1248                 if (rrd_test_error()) break;
1250                 /* write other rows of the bulk update, if any */
1251                 scratch_idx = CDP_secondary_val;
1252                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1253                 {
1254                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1255                    {
1256 #ifdef DEBUG
1257               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1258                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1259 #endif
1260                           /* wrap */
1261                           rrd.rra_ptr[i].cur_row = 0;
1262                           /* seek back to beginning of current rra */
1263                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1264                           {
1265                          rrd_set_error("seek error in rrd");
1266                          break;
1267                           }
1268 #ifdef DEBUG
1269                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1270 #endif
1271                           rra_current = rra_start;
1272                    }
1273                    if (pcdp_summary != NULL)
1274                    {
1275                       rra_time = (current_time - current_time 
1276                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1277                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1278                    }
1279 #ifdef HAVE_MMAP
1280                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1281                       pcdp_summary, &rra_time, rrd_mmaped_file);
1282 #else
1283                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1284                       pcdp_summary, &rra_time);
1285 #endif
1286                 }
1287                 
1288                 if (rrd_test_error())
1289                   break;
1290                 } /* RRA LOOP */
1292             /* break out of the argument parsing loop if error_string is set */
1293             if (rrd_test_error()){
1294                    free(step_start);
1295                    break;
1296             } 
1297             
1298         } /* endif a pdp_st has occurred */ 
1299         rrd.live_head->last_up = current_time;
1300         rrd.live_head->last_up_usec = current_time_usec; 
1301         free(step_start);
1302     } /* function argument loop */
1304     if (seasonal_coef != NULL) free(seasonal_coef);
1305     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1306         if (rra_step_cnt != NULL) free(rra_step_cnt);
1307     rpnstack_free(&rpnstack);
1309 #ifdef HAVE_MMAP
1310     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1311             rrd_set_error("error writing(unmapping) file: %s", filename);
1312     }
1313 #endif    
1314     /* if we got here and if there is an error and if the file has not been
1315      * written to, then close things up and return. */
1316     if (rrd_test_error()) {
1317         free(updvals);
1318         free(tmpl_idx);
1319         rrd_free(&rrd);
1320         free(pdp_temp);
1321         free(pdp_new);
1322         fclose(rrd_file);
1323         return(-1);
1324     }
1326     /* aargh ... that was tough ... so many loops ... anyway, its done.
1327      * we just need to write back the live header portion now*/
1329     if (fseek(rrd_file, (sizeof(stat_head_t)
1330                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1331                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1332               SEEK_SET) != 0) {
1333         rrd_set_error("seek rrd for live header writeback");
1334         free(updvals);
1335         free(tmpl_idx);
1336         rrd_free(&rrd);
1337         free(pdp_temp);
1338         free(pdp_new);
1339         fclose(rrd_file);
1340         return(-1);
1341     }
1343     if(version >= 3) {
1344             if(fwrite( rrd.live_head,
1345                        sizeof(live_head_t), 1, rrd_file) != 1){
1346                 rrd_set_error("fwrite live_head to rrd");
1347                 free(updvals);
1348                 rrd_free(&rrd);
1349                 free(tmpl_idx);
1350                 free(pdp_temp);
1351                 free(pdp_new);
1352                 fclose(rrd_file);
1353                 return(-1);
1354             }
1355     }
1356     else {
1357             if(fwrite( &rrd.live_head->last_up,
1358                        sizeof(time_t), 1, rrd_file) != 1){
1359                 rrd_set_error("fwrite live_head to rrd");
1360                 free(updvals);
1361                 rrd_free(&rrd);
1362                 free(tmpl_idx);
1363                 free(pdp_temp);
1364                 free(pdp_new);
1365                 fclose(rrd_file);
1366                 return(-1);
1367             }
1368     }
1369             
1371     if(fwrite( rrd.pdp_prep,
1372                sizeof(pdp_prep_t),
1373                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1374         rrd_set_error("ftwrite pdp_prep to rrd");
1375         free(updvals);
1376         rrd_free(&rrd);
1377         free(tmpl_idx);
1378         free(pdp_temp);
1379         free(pdp_new);
1380         fclose(rrd_file);
1381         return(-1);
1382     }
1384     if(fwrite( rrd.cdp_prep,
1385                sizeof(cdp_prep_t),
1386                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1387        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1389         rrd_set_error("ftwrite cdp_prep to rrd");
1390         free(updvals);
1391         free(tmpl_idx);
1392         rrd_free(&rrd);
1393         free(pdp_temp);
1394         free(pdp_new);
1395         fclose(rrd_file);
1396         return(-1);
1397     }
1399     if(fwrite( rrd.rra_ptr,
1400                sizeof(rra_ptr_t), 
1401                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1402         rrd_set_error("fwrite rra_ptr to rrd");
1403         free(updvals);
1404         free(tmpl_idx);
1405         rrd_free(&rrd);
1406         free(pdp_temp);
1407         free(pdp_new);
1408         fclose(rrd_file);
1409         return(-1);
1410     }
1412     /* OK now close the files and free the memory */
1413     if(fclose(rrd_file) != 0){
1414         rrd_set_error("closing rrd");
1415         free(updvals);
1416         free(tmpl_idx);
1417         rrd_free(&rrd);
1418         free(pdp_temp);
1419         free(pdp_new);
1420         return(-1);
1421     }
1423     /* calling the smoothing code here guarantees at most
1424          * one smoothing operation per rrd_update call. Unfortunately,
1425          * it is possible with bulk updates, or a long-delayed update
1426          * for smoothing to occur off-schedule. This really isn't
1427          * critical except during the burning cycles. */
1428         if (schedule_smooth)
1429         {
1430           rrd_file = fopen(filename,"rb+");
1431           rra_start = rra_begin;
1432           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1433           {
1434             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1435                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1436             {
1437 #ifdef DEBUG
1438               fprintf(stderr,"Running smoother for rra %ld\n",i);
1439 #endif
1440               apply_smoother(&rrd,i,rra_start,rrd_file);
1441               if (rrd_test_error())
1442                 break;
1443             }
1444             rra_start += rrd.rra_def[i].row_cnt
1445               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1446           }
1447           fclose(rrd_file);
1448         }
1449     rrd_free(&rrd);
1450     free(updvals);
1451     free(tmpl_idx);
1452     free(pdp_new);
1453     free(pdp_temp);
1454     return(0);
1457 /*
1458  * get exclusive lock to whole file.
1459  * lock gets removed when we close the file
1460  *
1461  * returns 0 on success
1462  */
1463 int
1464 LockRRD(FILE *rrdfile)
1466     int rrd_fd;         /* File descriptor for RRD */
1467     int rcstat;
1469     rrd_fd = fileno(rrdfile);
1471         {
1472 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1473     struct _stat st;
1475     if ( _fstat( rrd_fd, &st ) == 0 ) {
1476             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1477     } else {
1478             rcstat = -1;
1479     }
1480 #else
1481     struct flock        lock;
1482     lock.l_type = F_WRLCK;    /* exclusive write lock */
1483     lock.l_len = 0;           /* whole file */
1484     lock.l_start = 0;         /* start of file */
1485     lock.l_whence = SEEK_SET;   /* end of file */
1487     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1488 #endif
1489         }
1491     return(rcstat);
1495 #ifdef HAVE_MMAP
1496 info_t
1497 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1498                unsigned short CDP_scratch_idx, 
1499 #ifndef DEBUG
1500 FILE UNUSED(*rrd_file),
1501 #else
1502 FILE *rrd_file,
1503 #endif
1504                    info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1505 #else
1506 info_t
1507 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1508                unsigned short CDP_scratch_idx, FILE *rrd_file,
1509                    info_t *pcdp_summary, time_t *rra_time)
1510 #endif
1512    unsigned long ds_idx, cdp_idx;
1513    infoval iv;
1514   
1515    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1516    {
1517       /* compute the cdp index */
1518       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1519 #ifdef DEBUG
1520           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1521              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1522              rrd -> rra_def[rra_idx].cf_nam);
1523 #endif 
1524       if (pcdp_summary != NULL)
1525           {
1526              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1527              /* append info to the return hash */
1528                  pcdp_summary = info_push(pcdp_summary,
1529                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1530                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1531                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1532          RD_I_VAL, iv);
1533           }
1534 #ifdef HAVE_MMAP
1535           memcpy((char *)rrd_mmaped_file + *rra_current,
1536                           &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1537                           sizeof(rrd_value_t));
1538 #else
1539           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1540                  sizeof(rrd_value_t),1,rrd_file) != 1)
1541           { 
1542              rrd_set_error("writing rrd");
1543              return 0;
1544           }
1545 #endif
1546           *rra_current += sizeof(rrd_value_t);
1547         }
1548         return (pcdp_summary);