Code

reduce compiler warnings. Many small fixes. -- Mike Slifcak <slif@bellsouth.net>
[rrdtool.git] / src / rrd_update.c
1 /*****************************************************************************
2  * RRDtool 1.1.x  Copyright Tobias Oetiker, 1997 - 2004
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id$
7  * $Log$
8  * Revision 1.17  2004/05/26 22:11:12  oetiker
9  * reduce compiler warnings. Many small fixes. -- Mike Slifcak <slif@bellsouth.net>
10  *
11  * Revision 1.16  2004/05/25 20:52:16  oetiker
12  * fix spelling and syntax, especially in messages that are printed -- Mike Slifcak
13  *
14  * Revision 1.15  2004/05/25 20:51:49  oetiker
15  * Update displayed copyright messages to be consistent. -- Mike Slifcak
16  *
17  * Revision 1.14  2003/11/11 19:46:21  oetiker
18  * replaced time_value with rrd_time_value as MacOS X introduced a struct of that name in their standard headers
19  *
20  * Revision 1.13  2003/11/11 19:38:03  oetiker
21  * rrd files should NOT change size ever ... bulk update code wa buggy.
22  * -- David M. Grimes <dgrimes@navisite.com>
23  *
24  * Revision 1.12  2003/09/04 13:16:12  oetiker
25  * should not assigne but compare ... grrrrr
26  *
27  * Revision 1.11  2003/09/02 21:58:35  oetiker
28  * be pickier about what we accept in rrd_update. Complain if things do not work out
29  *
30  * Revision 1.10  2003/04/29 19:14:12  jake
31  * Change updatev RRA return from index_number to cf_nam, pdp_cnt.
32  * Also revert accidental addition of -I to aclocal MakeMakefile.
33  *
34  * Revision 1.9  2003/04/25 18:35:08  jake
35  * Alternate update interface, updatev. Returns info about CDPs written to disk as result of update. Output format is similar to rrd_info, a hash of key-values.
36  *
37  * Revision 1.8  2003/03/31 21:22:12  oetiker
38  * enables RRDtool updates with microsecond or in case of windows millisecond
39  * precision. This is needed to reduce time measurement error when archive step
40  * is small. (<30s) --  Sasha Mikheev <sasha@avalon-net.co.il>
41  *
42  * Revision 1.7  2003/02/13 07:05:27  oetiker
43  * Find attached the patch I promised to send to you. Please note that there
44  * are three new source files (src/rrd_is_thread_safe.h, src/rrd_thread_safe.c
45  * and src/rrd_not_thread_safe.c) and the introduction of librrd_th. This
46  * library is identical to librrd, but it contains support code for per-thread
47  * global variables currently used for error information only. This is similar
48  * to how errno per-thread variables are implemented.  librrd_th must be linked
49  * alongside of libpthred
50  *
51  * There is also a new file "THREADS", holding some documentation.
52  *
53  * -- Peter Stamfest <peter@stamfest.at>
54  *
55  * Revision 1.6  2002/02/01 20:34:49  oetiker
56  * fixed version number and date/time
57  *
58  * Revision 1.5  2001/05/09 05:31:01  oetiker
59  * Bug fix: when update of multiple PDP/CDP RRAs coincided
60  * with interpolation of multiple PDPs an incorrect value was
61  * stored as the CDP. Especially evident for GAUGE data sources.
62  * Minor changes to rrdcreate.pod. -- Jake Brutlag <jakeb@corp.webtv.net>
63  *
64  * Revision 1.4  2001/03/10 23:54:41  oetiker
65  * Support for COMPUTE data sources (CDEF data sources). Removes the RPN
66  * parser and calculator from rrd_graph and puts then in a new file,
67  * rrd_rpncalc.c. Changes to core files rrd_create and rrd_update. Some
68  * clean-up of aberrant behavior stuff, including a bug fix.
69  * Documentation update (rrdcreate.pod, rrdupdate.pod). Change xml format.
70  * -- Jake Brutlag <jakeb@corp.webtv.net>
71  *
72  * Revision 1.3  2001/03/04 13:01:55  oetiker
73  * Aberrant Behavior Detection support. A brief overview added to rrdtool.pod.
74  * Major updates to rrd_update.c, rrd_create.c. Minor update to other core files.
75  * This is backwards compatible! But new files using the Aberrant stuff are not readable
76  * by old rrdtool versions. See http://cricket.sourceforge.net/aberrant/rrd_hw.htm
77  * -- Jake Brutlag <jakeb@corp.webtv.net>
78  *
79  * Revision 1.2  2001/03/04 11:14:25  oetiker
80  * added at-style-time@value:value syntax to rrd_update
81  * --  Dave Bodenstab <imdave@mcs.net>
82  *
83  * Revision 1.1.1.1  2001/02/25 22:25:06  oetiker
84  * checkin
85  *
86  *****************************************************************************/
88 #include "rrd_tool.h"
89 #include <sys/types.h>
90 #include <fcntl.h>
92 #ifdef WIN32
93  #include <sys/locking.h>
94  #include <sys/stat.h>
95  #include <io.h>
96 #endif
98 #include "rrd_hw.h"
99 #include "rrd_rpncalc.h"
101 #include "rrd_is_thread_safe.h"
103 #ifdef WIN32
104 /*
105  * WIN32 does not have gettimeofday     and struct timeval. This is a quick and dirty
106  * replacement.
107  */
108 #include <sys/timeb.h>
110 struct timeval {
111         time_t tv_sec; /* seconds */
112         long tv_usec;  /* microseconds */
113 };
115 struct __timezone {
116         int  tz_minuteswest; /* minutes W of Greenwich */
117         int  tz_dsttime;     /* type of dst correction */
118 };
120 static gettimeofday(struct timeval *t, struct __timezone *tz) {
121         
122         struct timeb current_time;
124         _ftime(&current_time);
125         
126         t->tv_sec  = current_time.time;
127         t->tv_usec = current_time.millitm * 1000;
130 #endif
131 /*
132  * normilize time as returned by gettimeofday. usec part must
133  * be always >= 0
134  */
135 static void normalize_time(struct timeval *t)
137         if(t->tv_usec < 0) {
138                 t->tv_sec--;
139                 t->tv_usec += 1000000L;
140         }
143 /* Local prototypes */
144 int LockRRD(FILE *rrd_file);
145 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, 
146                                         unsigned long *rra_current,
147                                         unsigned short CDP_scratch_idx, FILE *rrd_file,
148                                         info_t *pcdp_summary, time_t *rra_time);
149 int rrd_update_r(char *filename, char *template, int argc, char **argv);
150 int _rrd_update(char *filename, char *template, int argc, char **argv, 
151                                         info_t*);
153 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
156 #ifdef STANDALONE
157 int 
158 main(int argc, char **argv){
159         rrd_update(argc,argv);
160         if (rrd_test_error()) {
161                 printf("RRDtool 1.1.x  Copyright (C) 1997-2004 by Tobias Oetiker <tobi@oetiker.ch>\n\n"
162                         "Usage: rrdupdate filename\n"
163                         "\t\t\t[--template|-t ds-name:ds-name:...]\n"
164                         "\t\t\ttime|N:value[:value...]\n\n"
165                         "\t\t\tat-time@value[:value...]\n\n"
166                         "\t\t\t[ time:value[:value...] ..]\n\n");
167                                    
168                 printf("ERROR: %s\n",rrd_get_error());
169                 rrd_clear_error();                                                            
170                 return 1;
171         }
172         return 0;
174 #endif
176 info_t *rrd_update_v(int argc, char **argv)
178     char             *template = NULL;          
179         info_t *result = NULL;
180         infoval rc;
182     while (1) {
183                 static struct option long_options[] =
184                         {
185                                 {"template",      required_argument, 0, 't'},
186                                 {0,0,0,0}
187                         };
188                 int option_index = 0;
189                 int opt;
190                 opt = getopt_long(argc, argv, "t:", 
191                                                   long_options, &option_index);
192                 
193                 if (opt == EOF)
194                         break;
195                 
196                 switch(opt) {
197                 case 't':
198                         template = optarg;
199                         break;
200                 
201                 case '?':
202                         rrd_set_error("unknown option '%s'",argv[optind-1]);
203             rc.u_int = -1;
204                         goto end_tag;
205                 }
206     }
208     /* need at least 2 arguments: filename, data. */
209     if (argc-optind < 2) {
210                 rrd_set_error("Not enough arguments");
211         rc.u_int = -1;
212                 goto end_tag;
213     }
214     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
215         rc.u_int = _rrd_update(argv[optind], template,
216                       argc - optind - 1, argv + optind + 1, result);
217     result->value.u_int = rc.u_int;
218 end_tag:
219     return result;
222 int
223 rrd_update(int argc, char **argv)
225     char             *template = NULL;          
226     int rc;
228     while (1) {
229                 static struct option long_options[] =
230                         {
231                                 {"template",      required_argument, 0, 't'},
232                                 {0,0,0,0}
233                         };
234                 int option_index = 0;
235                 int opt;
236                 opt = getopt_long(argc, argv, "t:", 
237                                                   long_options, &option_index);
238                 
239                 if (opt == EOF)
240                         break;
241                 
242                 switch(opt) {
243                 case 't':
244                         template = optarg;
245                         break;
246                 
247                 case '?':
248                         rrd_set_error("unknown option '%s'",argv[optind-1]);
249                         return(-1);
250                 }
251     }
253     /* need at least 2 arguments: filename, data. */
254     if (argc-optind < 2) {
255                 rrd_set_error("Not enough arguments");
257                 return -1;
258     }
259  
260         rc = rrd_update_r(argv[optind], template,
261                       argc - optind - 1, argv + optind + 1);
262     return rc;
265 int
266 rrd_update_r(char *filename, char *template, int argc, char **argv)
268    return _rrd_update(filename, template, argc, argv, NULL);
271 int
272 _rrd_update(char *filename, char *template, int argc, char **argv, 
273    info_t *pcdp_summary)
276     int              arg_i = 2;
277     short            j;
278     unsigned long    i,ii,iii=1;
280     unsigned long    rra_begin;          /* byte pointer to the rra
281                                           * area in the rrd file.  this
282                                           * pointer never changes value */
283     unsigned long    rra_start;          /* byte pointer to the rra
284                                           * area in the rrd file.  this
285                                           * pointer changes as each rrd is
286                                           * processed. */
287     unsigned long    rra_current;        /* byte pointer to the current write
288                                           * spot in the rrd file. */
289     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
290     double           interval,
291         pre_int,post_int;                /* interval between this and
292                                           * the last run */
293     unsigned long    proc_pdp_st;        /* which pdp_st was the last
294                                           * to be processed */
295     unsigned long    occu_pdp_st;        /* when was the pdp_st
296                                           * before the last update
297                                           * time */
298     unsigned long    proc_pdp_age;       /* how old was the data in
299                                           * the pdp prep area when it
300                                           * was last updated */
301     unsigned long    occu_pdp_age;       /* how long ago was the last
302                                           * pdp_step time */
303     rrd_value_t      *pdp_new;           /* prepare the incoming data
304                                           * to be added the the
305                                           * existing entry */
306     rrd_value_t      *pdp_temp;          /* prepare the pdp values 
307                                           * to be added the the
308                                           * cdp values */
310     long             *tmpl_idx;          /* index representing the settings
311                                             transported by the template index */
312     unsigned long    tmpl_cnt = 2;       /* time and data */
314     FILE             *rrd_file;
315     rrd_t            rrd;
316     time_t           current_time;
317         time_t           rra_time; /* time of update for a RRA */
318     unsigned long    current_time_usec;  /* microseconds part of current time */
319     struct timeval   tmp_time;           /* used for time conversion */
321     char             **updvals;
322     int              schedule_smooth = 0;
323         rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
324                                          /* a vector of future Holt-Winters seasonal coefs */
325     unsigned long    elapsed_pdp_st;
326                                          /* number of elapsed PDP steps since last update */
327     unsigned long    *rra_step_cnt = NULL;
328                                          /* number of rows to be updated in an RRA for a data
329                                           * value. */
330     unsigned long    start_pdp_offset;
331                                          /* number of PDP steps since the last update that
332                                           * are assigned to the first CDP to be generated
333                                           * since the last update. */
334     unsigned short   scratch_idx;
335                                          /* index into the CDP scratch array */
336     enum cf_en       current_cf;
337                                          /* numeric id of the current consolidation function */
338     rpnstack_t       rpnstack; /* used for COMPUTE DS */
339     int              version;  /* rrd version */
340     char             *endptr; /* used in the conversion */
342     rpnstack_init(&rpnstack);
344     /* need at least 1 arguments: data. */
345     if (argc < 1) {
346         rrd_set_error("Not enough arguments");
347         return -1;
348     }
349     
350     
352     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
353         return -1;
354     }
355     /* initialize time */
356     version = atoi(rrd.stat_head->version);
357     gettimeofday(&tmp_time, 0);
358     normalize_time(&tmp_time);
359     current_time = tmp_time.tv_sec;
360     if(version >= 3) {
361         current_time_usec = tmp_time.tv_usec;
362     }
363     else {
364         current_time_usec = 0;
365     }
367     rra_current = rra_start = rra_begin = ftell(rrd_file);
368     /* This is defined in the ANSI C standard, section 7.9.5.3:
370         When a file is opened with udpate mode ('+' as the second
371         or third character in the ... list of mode argument
372         variables), both input and ouptut may be performed on the
373         associated stream.  However, ...  input may not be directly
374         followed by output without an intervening call to a file
375         positioning function, unless the input oepration encounters
376         end-of-file. */
377     fseek(rrd_file, 0, SEEK_CUR);
379     
380     /* get exclusive lock to whole file.
381      * lock gets removed when we close the file.
382      */
383     if (LockRRD(rrd_file) != 0) {
384       rrd_set_error("could not lock RRD");
385       rrd_free(&rrd);
386       fclose(rrd_file);
387       return(-1);   
388     } 
390     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
391         rrd_set_error("allocating updvals pointer array");
392         rrd_free(&rrd);
393         fclose(rrd_file);
394         return(-1);
395     }
397     if ((pdp_temp = malloc(sizeof(rrd_value_t)
398                            *rrd.stat_head->ds_cnt))==NULL){
399         rrd_set_error("allocating pdp_temp ...");
400         free(updvals);
401         rrd_free(&rrd);
402         fclose(rrd_file);
403         return(-1);
404     }
406     if ((tmpl_idx = malloc(sizeof(unsigned long)
407                            *(rrd.stat_head->ds_cnt+1)))==NULL){
408         rrd_set_error("allocating tmpl_idx ...");
409         free(pdp_temp);
410         free(updvals);
411         rrd_free(&rrd);
412         fclose(rrd_file);
413         return(-1);
414     }
415     /* initialize template redirector */
416     /* default config example (assume DS 1 is a CDEF DS)
417        tmpl_idx[0] -> 0; (time)
418        tmpl_idx[1] -> 1; (DS 0)
419        tmpl_idx[2] -> 3; (DS 2)
420        tmpl_idx[3] -> 4; (DS 3) */
421     tmpl_idx[0] = 0; /* time */
422     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++) 
423         {
424            if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
425               tmpl_idx[ii++]=i;
426         }
427     tmpl_cnt= ii;
429     if (template) {
430         char *dsname;
431         unsigned int tmpl_len;
432         dsname = template;
433         tmpl_cnt = 1; /* the first entry is the time */
434         tmpl_len = strlen(template);
435         for(i=0;i<=tmpl_len ;i++) {
436             if (template[i] == ':' || template[i] == '\0') {
437                 template[i] = '\0';
438                 if (tmpl_cnt>rrd.stat_head->ds_cnt){
439                     rrd_set_error("Template contains more DS definitions than RRD");
440                     free(updvals); free(pdp_temp);
441                     free(tmpl_idx); rrd_free(&rrd);
442                     fclose(rrd_file); return(-1);
443                 }
444                 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
445                     rrd_set_error("unknown DS name '%s'",dsname);
446                     free(updvals); free(pdp_temp);
447                     free(tmpl_idx); rrd_free(&rrd);
448                     fclose(rrd_file); return(-1);
449                 } else {
450                   /* the first element is always the time */
451                   tmpl_idx[tmpl_cnt-1]++; 
452                   /* go to the next entry on the template */
453                   dsname = &template[i+1];
454                   /* fix the damage we did before */
455                   if (i<tmpl_len) {
456                      template[i]=':';
457                   } 
459                 }
460             }       
461         }
462     }
463     if ((pdp_new = malloc(sizeof(rrd_value_t)
464                           *rrd.stat_head->ds_cnt))==NULL){
465         rrd_set_error("allocating pdp_new ...");
466         free(updvals);
467         free(pdp_temp);
468         free(tmpl_idx);
469         rrd_free(&rrd);
470         fclose(rrd_file);
471         return(-1);
472     }
474     /* loop through the arguments. */
475     for(arg_i=0; arg_i<argc;arg_i++) {
476         char *stepper = malloc((strlen(argv[arg_i])+1)*sizeof(char));
477         char *step_start = stepper;
478         char *p;
479         char *parsetime_error = NULL;
480         enum {atstyle, normal} timesyntax;
481         struct rrd_time_value ds_tv;
482         if (stepper == NULL){
483                 rrd_set_error("failed duplication argv entry");
484                 free(updvals);
485                 free(pdp_temp);  
486                 free(tmpl_idx);
487                 rrd_free(&rrd);
488                 fclose(rrd_file);
489                 return(-1);
490          }
491         /* initialize all ds input to unknown except the first one
492            which has always got to be set */
493         for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
494         strcpy(stepper,argv[arg_i]);
495         updvals[0]=stepper;
496         /* separate all ds elements; first must be examined separately
497            due to alternate time syntax */
498         if ((p=strchr(stepper,'@'))!=NULL) {
499             timesyntax = atstyle;
500             *p = '\0';
501             stepper = p+1;
502         } else if ((p=strchr(stepper,':'))!=NULL) {
503             timesyntax = normal;
504             *p = '\0';
505             stepper = p+1;
506         } else {
507             rrd_set_error("expected timestamp not found in data source from %s:...",
508                           argv[arg_i]);
509             free(step_start);
510             break;
511         }
512         ii=1;
513         updvals[tmpl_idx[ii]] = stepper;
514         while (*stepper) {
515             if (*stepper == ':') {
516                 *stepper = '\0';
517                 ii++;
518                 if (ii<tmpl_cnt){                   
519                     updvals[tmpl_idx[ii]] = stepper+1;
520                 }
521             }
522             stepper++;
523         }
525         if (ii != tmpl_cnt-1) {
526             rrd_set_error("expected %lu data source readings (got %lu) from %s:...",
527                           tmpl_cnt-1, ii, argv[arg_i]);
528             free(step_start);
529             break;
530         }
531         
532         /* get the time from the reading ... handle N */
533         if (timesyntax == atstyle) {
534             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
535                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
536                 free(step_start);
537                 break;
538             }
539             if (ds_tv.type == RELATIVE_TO_END_TIME ||
540                 ds_tv.type == RELATIVE_TO_START_TIME) {
541                 rrd_set_error("specifying time relative to the 'start' "
542                               "or 'end' makes no sense here: %s",
543                               updvals[0]);
544                 free(step_start);
545                 break;
546             }
548             current_time = mktime(&ds_tv.tm) + ds_tv.offset;
549             current_time_usec = 0; /* FIXME: how to handle usecs here ? */
550             
551         } else if (strcmp(updvals[0],"N")==0){
552             gettimeofday(&tmp_time, 0);
553             normalize_time(&tmp_time);
554             current_time = tmp_time.tv_sec;
555             current_time_usec = tmp_time.tv_usec;
556         } else {
557             double tmp;
558             tmp = strtod(updvals[0], 0);
559             current_time = floor(tmp);
560             current_time_usec = (long)((tmp - current_time) * 1000000L);
561         }
562         /* dont do any correction for old version RRDs */
563         if(version < 3) 
564             current_time_usec = 0;
565         
566         if(current_time <= rrd.live_head->last_up){
567             rrd_set_error("illegal attempt to update using time %ld when "
568                           "last update time is %ld (minimum one second step)",
569                           current_time, rrd.live_head->last_up);
570             free(step_start);
571             break;
572         }
573         
574         
575         /* seek to the beginning of the rra's */
576         if (rra_current != rra_begin) {
577             if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
578                 rrd_set_error("seek error in rrd");
579                 free(step_start);
580                 break;
581             }
582             rra_current = rra_begin;
583         }
584         rra_start = rra_begin;
586         /* when was the current pdp started */
587         proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
588         proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
590         /* when did the last pdp_st occur */
591         occu_pdp_age = current_time % rrd.stat_head->pdp_step;
592         occu_pdp_st = current_time - occu_pdp_age;
593         /* interval = current_time - rrd.live_head->last_up; */
594         interval    = current_time + ((double)current_time_usec - (double)rrd.live_head->last_up_usec)/1000000.0 - rrd.live_head->last_up;
595     
596         if (occu_pdp_st > proc_pdp_st){
597             /* OK we passed the pdp_st moment*/
598             pre_int =  occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
599                                                               * occurred before the latest
600                                                               * pdp_st moment*/
601             pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
602             post_int = occu_pdp_age;                         /* how much after it */
603             post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
604         } else {
605             pre_int = interval;
606             post_int = 0;
607         }
609 #ifdef DEBUG
610         printf(
611                "proc_pdp_age %lu\t"
612                "proc_pdp_st %lu\t" 
613                "occu_pfp_age %lu\t" 
614                "occu_pdp_st %lu\t"
615                "int %lf\t"
616                "pre_int %lf\t"
617                "post_int %lf\n", proc_pdp_age, proc_pdp_st, 
618                 occu_pdp_age, occu_pdp_st,
619                interval, pre_int, post_int);
620 #endif
621     
622         /* process the data sources and update the pdp_prep 
623          * area accordingly */
624         for(i=0;i<rrd.stat_head->ds_cnt;i++){
625             enum dst_en dst_idx;
626             dst_idx= dst_conv(rrd.ds_def[i].dst);
627                 /* NOTE: DST_CDEF should never enter this if block, because
628                  * updvals[i+1][0] is initialized to 'U'; unless the caller
629                  * accidently specified a value for the DST_CDEF. To handle 
630                  * this case, an extra check is required. */
631             if((updvals[i+1][0] != 'U') &&
632                    (dst_idx != DST_CDEF) &&
633                rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
634                double rate = DNAN;
635                /* the data source type defines how to process the data */
636                 /* pdp_new contains rate * time ... eg the bytes
637                  * transferred during the interval. Doing it this way saves
638                  * a lot of math operations */
639                 
641                 switch(dst_idx){
642                 case DST_COUNTER:
643                 case DST_DERIVE:
644                     if(rrd.pdp_prep[i].last_ds[0] != 'U'){
645                       for(ii=0;updvals[i+1][ii] != '\0';ii++){
646                             if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
647                                  rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
648                                  break;
649                             }
650                        }
651                        if (rrd_test_error()){
652                             break;
653                        }
654                        pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
655                        if(dst_idx == DST_COUNTER) {
656                           /* simple overflow catcher suggested by Andres Kroonmaa */
657                           /* this will fail terribly for non 32 or 64 bit counters ... */
658                           /* are there any others in SNMP land ? */
659                           if (pdp_new[i] < (double)0.0 ) 
660                             pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
661                           if (pdp_new[i] < (double)0.0 ) 
662                             pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
663                        }
664                        rate = pdp_new[i] / interval;
665                     }
666                    else {
667                      pdp_new[i]= DNAN;          
668                    }
669                    break;
670                 case DST_ABSOLUTE:
671                     errno = 0;
672                     pdp_new[i] = strtod(updvals[i+1],&endptr);
673                     if (errno > 0){
674                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
675                         break;
676                     };
677                     if (endptr[0] != '\0'){
678                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
679                         break;
680                     }
681                     rate = pdp_new[i] / interval;                 
682                     break;
683                 case DST_GAUGE:
684                     errno = 0;
685                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
686                     if (errno > 0){
687                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
688                         break;
689                     };
690                     if (endptr[0] != '\0'){
691                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
692                         break;
693                     }
694                     rate = pdp_new[i] / interval;                  
695                     break;
696                 default:
697                     rrd_set_error("rrd contains unknown DS type : '%s'",
698                                   rrd.ds_def[i].dst);
699                     break;
700                 }
701                 /* break out of this for loop if the error string is set */
702                 if (rrd_test_error()){
703                     break;
704                 }
705                /* make sure pdp_temp is neither too large or too small
706                 * if any of these occur it becomes unknown ...
707                 * sorry folks ... */
708                if ( ! isnan(rate) && 
709                     (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
710                          rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||     
711                     ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
712                         rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
713                   pdp_new[i] = DNAN;
714                }               
715             } else {
716                 /* no news is news all the same */
717                 pdp_new[i] = DNAN;
718             }
719             
720             /* make a copy of the command line argument for the next run */
721 #ifdef DEBUG
722             fprintf(stderr,
723                     "prep ds[%lu]\t"
724                     "last_arg '%s'\t"
725                     "this_arg '%s'\t"
726                     "pdp_new %10.2f\n",
727                     i,
728                     rrd.pdp_prep[i].last_ds,
729                     updvals[i+1], pdp_new[i]);
730 #endif
731             if(dst_idx == DST_COUNTER || dst_idx == DST_DERIVE){
732                 strncpy(rrd.pdp_prep[i].last_ds,
733                         updvals[i+1],LAST_DS_LEN-1);
734                 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
735             }
736         }
737         /* break out of the argument parsing loop if the error_string is set */
738         if (rrd_test_error()){
739             free(step_start);
740             break;
741         }
742         /* has a pdp_st moment occurred since the last run ? */
744         if (proc_pdp_st == occu_pdp_st){
745             /* no we have not passed a pdp_st moment. therefore update is simple */
747             for(i=0;i<rrd.stat_head->ds_cnt;i++){
748                 if(isnan(pdp_new[i]))
749                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += interval;
750                 else
751                     rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
752 #ifdef DEBUG
753                 fprintf(stderr,
754                         "NO PDP  ds[%lu]\t"
755                         "value %10.2f\t"
756                         "unkn_sec %5lu\n",
757                         i,
758                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
759                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
760 #endif
761             }   
762         } else {
763             /* an pdp_st has occurred. */
765             /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which 
766              * occurred up to the last run.        
767             pdp_new[] contains rate*seconds from the latest run.
768             pdp_temp[] will contain the rate for cdp */
770             for(i=0;i<rrd.stat_head->ds_cnt;i++){
771                 /* update pdp_prep to the current pdp_st */
772                 if(isnan(pdp_new[i]))
773                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += pre_int;
774                 else
775                     rrd.pdp_prep[i].scratch[PDP_val].u_val += 
776                         pdp_new[i]/(double)interval*(double)pre_int;
778                 /* if too much of the pdp_prep is unknown we dump it */
779                 if ((rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt 
780                      > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
781                     (occu_pdp_st-proc_pdp_st <= 
782                      rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
783                     pdp_temp[i] = DNAN;
784                 } else {
785                     pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
786                         / (double)( occu_pdp_st
787                                    - proc_pdp_st
788                                    - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
789                 }
791                 /* process CDEF data sources; remember each CDEF DS can
792                  * only reference other DS with a lower index number */
793             if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
794                    rpnp_t *rpnp;
795                    rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
796                    /* substitue data values for OP_VARIABLE nodes */
797                    for (ii = 0; rpnp[ii].op != OP_END; ii++)
798                    {
799                           if (rpnp[ii].op == OP_VARIABLE) {
800                                  rpnp[ii].op = OP_NUMBER;
801                                  rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
802                           }
803                    }
804                    /* run the rpn calculator */
805                    if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
806                           free(rpnp);
807                           break; /* exits the data sources pdp_temp loop */
808                    }
809                 }
810         
811                 /* make pdp_prep ready for the next run */
812                 if(isnan(pdp_new[i])){
813                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = post_int;
814                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 0.0;
815                 } else {
816                     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
817                     rrd.pdp_prep[i].scratch[PDP_val].u_val = 
818                         pdp_new[i]/(double)interval*(double)post_int;
819                 }
821 #ifdef DEBUG
822                 fprintf(stderr,
823                         "PDP UPD ds[%lu]\t"
824                         "pdp_temp %10.2f\t"
825                         "new_prep %10.2f\t"
826                         "new_unkn_sec %5lu\n",
827                         i, pdp_temp[i],
828                         rrd.pdp_prep[i].scratch[PDP_val].u_val,
829                         rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
830 #endif
831             }
833                 /* if there were errors during the last loop, bail out here */
834             if (rrd_test_error()){
835                free(step_start);
836                break;
837             }
839                 /* compute the number of elapsed pdp_st moments */
840                 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
841 #ifdef DEBUG
842                 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
843 #endif
844                 if (rra_step_cnt == NULL)
845                 {
846                    rra_step_cnt = (unsigned long *) 
847                           malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
848                 }
850             for(i = 0, rra_start = rra_begin;
851                 i < rrd.stat_head->rra_cnt;
852             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
853                 i++)
854                 {
855                 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
856                 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
857                    (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
858         if (start_pdp_offset <= elapsed_pdp_st) {
859            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) / 
860                       rrd.rra_def[i].pdp_cnt + 1;
861             } else {
862                    rra_step_cnt[i] = 0;
863                 }
865                 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) 
866                 {
867                    /* If this is a bulk update, we need to skip ahead in the seasonal
868                         * arrays so that they will be correct for the next observed value;
869                         * note that for the bulk update itself, no update will occur to
870                         * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
871                         * be set to DNAN. */
872            if (rra_step_cnt[i] > 2) 
873                    {
874                           /* skip update by resetting rra_step_cnt[i],
875                            * note that this is not data source specific; this is due
876                            * to the bulk update, not a DNAN value for the specific data
877                            * source. */
878                           rra_step_cnt[i] = 0;
879               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st, 
880                              &last_seasonal_coef);
881                       lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
882                              &seasonal_coef);
883                    }
884                 
885                   /* periodically run a smoother for seasonal effects */
886                   /* Need to use first cdp parameter buffer to track
887                    * burnin (burnin requires a specific smoothing schedule).
888                    * The CDP_init_seasonal parameter is really an RRA level,
889                    * not a data source within RRA level parameter, but the rra_def
890                    * is read only for rrd_update (not flushed to disk). */
891                   iii = i*(rrd.stat_head -> ds_cnt);
892                   if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt 
893                           <= BURNIN_CYCLES)
894                   {
895                      if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st 
896                                  > rrd.rra_def[i].row_cnt - 1) {
897                            /* mark off one of the burnin cycles */
898                            ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
899                        schedule_smooth = 1;
900                          }  
901                   } else {
902                          /* someone has no doubt invented a trick to deal with this
903                           * wrap around, but at least this code is clear. */
904                          if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
905                              rrd.rra_ptr[i].cur_row)
906                          {
907                                  /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
908                                   * mapping between PDP and CDP */
909                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
910                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
911                                  {
912 #ifdef DEBUG
913                                         fprintf(stderr,
914                                         "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
915                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
916                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
917 #endif
918                                         schedule_smooth = 1;
919                                  }
920              } else {
921                                  /* can't rely on negative numbers because we are working with
922                                   * unsigned values */
923                                  /* Don't need modulus here. If we've wrapped more than once, only
924                                   * one smooth is executed at the end. */
925                                  if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
926                                         && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
927                                         >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
928                                  {
929 #ifdef DEBUG
930                                         fprintf(stderr,
931                                         "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
932                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st, 
933                                         rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
934 #endif
935                                         schedule_smooth = 1;
936                                  }
937                          }
938                   }
940               rra_current = ftell(rrd_file); 
941                 } /* if cf is DEVSEASONAL or SEASONAL */
943         if (rrd_test_error()) break;
945                     /* update CDP_PREP areas */
946                     /* loop over data soures within each RRA */
947                     for(ii = 0;
948                         ii < rrd.stat_head->ds_cnt;
949                         ii++)
950                         {
951                         
952                         /* iii indexes the CDP prep area for this data source within the RRA */
953                         iii=i*rrd.stat_head->ds_cnt+ii;
955                         if (rrd.rra_def[i].pdp_cnt > 1) {
956                           
957                            if (rra_step_cnt[i] > 0) {
958                            /* If we are in this block, as least 1 CDP value will be written to
959                                 * disk, this is the CDP_primary_val entry. If more than 1 value needs
960                                 * to be written, then the "fill in" value is the CDP_secondary_val
961                                 * entry. */
962                                   if (isnan(pdp_temp[ii]))
963                   {
964                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
965                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
966                                   } else {
967                                          /* CDP_secondary value is the RRA "fill in" value for intermediary
968                                           * CDP data entries. No matter the CF, the value is the same because
969                                           * the average, max, min, and last of a list of identical values is
970                                           * the same, namely, the value itself. */
971                                          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
972                                   }
973                      
974                                   if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
975                                       > rrd.rra_def[i].pdp_cnt*
976                                       rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
977                                   {
978                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
979                                          /* initialize carry over */
980                                          if (current_cf == CF_AVERAGE) {
981                                                    if (isnan(pdp_temp[ii])) { 
982                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
983                                                    } else {
984                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
985                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
986                                                    }
987                                          } else {
988                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
989                                          }
990                                   } else {
991                                          rrd_value_t cum_val, cur_val; 
992                                      switch (current_cf) {
993                                                 case CF_AVERAGE:
994                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
995                                                   cur_val = IFDNAN(pdp_temp[ii],0.0);
996                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
997                                                (cum_val + cur_val * start_pdp_offset) /
998                                            (rrd.rra_def[i].pdp_cnt
999                                                -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1000                                                    /* initialize carry over value */
1001                                                    if (isnan(pdp_temp[ii])) { 
1002                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1003                                                    } else {
1004                                                           rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1005                                                                  ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1006                                                    }
1007                                                    break;
1008                                                 case CF_MAXIMUM:
1009                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1010                                                   cur_val = IFDNAN(pdp_temp[ii],-DINF);
1011 #ifdef DEBUG
1012                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1013                                                           isnan(pdp_temp[ii])) {
1014                                                      fprintf(stderr,
1015                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1016                                                                 i,ii);
1017                                                          exit(-1);
1018                                                   }
1019 #endif
1020                                                   if (cur_val > cum_val)
1021                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1022                                                   else
1023                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1024                                                   /* initialize carry over value */
1025                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1026                                                   break;
1027                                                 case CF_MINIMUM:
1028                                                   cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1029                                                   cur_val = IFDNAN(pdp_temp[ii],DINF);
1030 #ifdef DEBUG
1031                                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1032                                                           isnan(pdp_temp[ii])) {
1033                                                      fprintf(stderr,
1034                                                                 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1035                                                                 i,ii);
1036                                                          exit(-1);
1037                                                   }
1038 #endif
1039                                                   if (cur_val < cum_val)
1040                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1041                                                   else
1042                                                          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1043                                                   /* initialize carry over value */
1044                                                   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1045                                                   break;
1046                                                 case CF_LAST:
1047                                                 default:
1048                                                    rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1049                                                    /* initialize carry over value */
1050                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1051                                                 break;
1052                                          }
1053                                   } /* endif meets xff value requirement for a valid value */
1054                                   /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1055                                    * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1056                                   if (isnan(pdp_temp[ii]))
1057                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 
1058                                                 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1059                                   else
1060                                          rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1061                } else  /* rra_step_cnt[i]  == 0 */
1062                            {
1063 #ifdef DEBUG
1064                                   if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1065                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1066                                          i,ii);
1067                                   } else {
1068                                   fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1069                                          i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1070                                   }
1071 #endif
1072                                   if (isnan(pdp_temp[ii])) {
1073                                  rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1074                                   } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1075                                   {
1076                                          if (current_cf == CF_AVERAGE) {
1077                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1078                                                    elapsed_pdp_st;
1079                                          } else {
1080                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1081                                          }
1082 #ifdef DEBUG
1083                                          fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1084                                             i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1085 #endif
1086                                   } else {
1087                                          switch (current_cf) {
1088                                          case CF_AVERAGE:
1089                                             rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1090                                                    elapsed_pdp_st;
1091                                                 break;
1092                                          case CF_MINIMUM:
1093                                                 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1094                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1095                                                 break; 
1096                                          case CF_MAXIMUM:
1097                                                 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1098                                                    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099                                                 break; 
1100                                          case CF_LAST:
1101                                          default:
1102                                                 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1103                                                 break;
1104                                          }
1105                                   }
1106                            }
1107                         } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1108                            if (elapsed_pdp_st > 2)
1109                            {
1110                                    switch (current_cf) {
1111                                    case CF_AVERAGE:
1112                                    default:
1113                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1114                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1115                                           break;
1116                    case CF_SEASONAL:
1117                                    case CF_DEVSEASONAL:
1118                                           /* need to update cached seasonal values, so they are consistent
1119                                            * with the bulk update */
1120                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1121                                            * CDP_last_deviation are the same. */
1122                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1123                                                  last_seasonal_coef[ii];
1124                                           rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1125                                                  seasonal_coef[ii];
1126                                           break;
1127                    case CF_HWPREDICT:
1128                                           /* need to update the null_count and last_null_count.
1129                                            * even do this for non-DNAN pdp_temp because the
1130                                            * algorithm is not learning from batch updates. */
1131                                           rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt += 
1132                                                  elapsed_pdp_st;
1133                                           rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt += 
1134                                                  elapsed_pdp_st - 1;
1135                                           /* fall through */
1136                                    case CF_DEVPREDICT:
1137                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1138                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1139                                           break;
1140                    case CF_FAILURES:
1141                                           /* do not count missed bulk values as failures */
1142                                   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1143                                   rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1144                                           /* need to reset violations buffer.
1145                                            * could do this more carefully, but for now, just
1146                                            * assume a bulk update wipes away all violations. */
1147                       erase_violations(&rrd, iii, i);
1148                                           break;
1149                                    }
1150                            } 
1151                         } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1153                         if (rrd_test_error()) break;
1155                         } /* endif data sources loop */
1156         } /* end RRA Loop */
1158                 /* this loop is only entered if elapsed_pdp_st < 3 */
1159                 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val; 
1160                          j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1161                 {
1162                for(i = 0, rra_start = rra_begin;
1163                    i < rrd.stat_head->rra_cnt;
1164                rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1165                    i++)
1166                    {
1167                           if (rrd.rra_def[i].pdp_cnt > 1) continue;
1169                   current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1170                           if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1171                           {
1172                          lookup_seasonal(&rrd,i,rra_start,rrd_file,
1173                                     elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1174                                 &seasonal_coef);
1175                  rra_current = ftell(rrd_file);
1176                           }
1177                           if (rrd_test_error()) break;
1178                       /* loop over data soures within each RRA */
1179                       for(ii = 0;
1180                           ii < rrd.stat_head->ds_cnt;
1181                           ii++)
1182                           {
1183                              update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1184                                         i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1185                                     scratch_idx, seasonal_coef);
1186                           }
1187            } /* end RRA Loop */
1188                    if (rrd_test_error()) break;
1189             } /* end elapsed_pdp_st loop */
1191                 if (rrd_test_error()) break;
1193                 /* Ready to write to disk */
1194                 /* Move sequentially through the file, writing one RRA at a time.
1195                  * Note this architecture divorces the computation of CDP with
1196                  * flushing updated RRA entries to disk. */
1197             for(i = 0, rra_start = rra_begin;
1198                 i < rrd.stat_head->rra_cnt;
1199             rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1200                 i++) {
1201                 /* is there anything to write for this RRA? If not, continue. */
1202         if (rra_step_cnt[i] == 0) continue;
1204                 /* write the first row */
1205 #ifdef DEBUG
1206         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1207 #endif
1208             rrd.rra_ptr[i].cur_row++;
1209             if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1210                    rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1211                 /* positition on the first row */
1212                 rra_pos_tmp = rra_start +
1213                    (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1214                 if(rra_pos_tmp != rra_current) {
1215                    if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1216                       rrd_set_error("seek error in rrd");
1217                       break;
1218                    }
1219                    rra_current = rra_pos_tmp;
1220                 }
1222 #ifdef DEBUG
1223             fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1224 #endif
1225                 scratch_idx = CDP_primary_val;
1226                 if (pcdp_summary != NULL)
1227                 {
1228                    rra_time = (current_time - current_time 
1229                    % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1230                    - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1231                 }
1232                 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file, 
1233                    pcdp_summary, &rra_time);
1234                 if (rrd_test_error()) break;
1236                 /* write other rows of the bulk update, if any */
1237                 scratch_idx = CDP_secondary_val;
1238                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1239                 {
1240                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1241                    {
1242 #ifdef DEBUG
1243               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1244                           rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1245 #endif
1246                           /* wrap */
1247                           rrd.rra_ptr[i].cur_row = 0;
1248                           /* seek back to beginning of current rra */
1249                       if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1250                           {
1251                          rrd_set_error("seek error in rrd");
1252                          break;
1253                           }
1254 #ifdef DEBUG
1255                   fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1256 #endif
1257                           rra_current = rra_start;
1258                    }
1259                    if (pcdp_summary != NULL)
1260                    {
1261                       rra_time = (current_time - current_time 
1262                       % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1263                       - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1264                    }
1265                    pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1266                       pcdp_summary, &rra_time);
1267                 }
1268                 
1269                 if (rrd_test_error())
1270                   break;
1271                 } /* RRA LOOP */
1273             /* break out of the argument parsing loop if error_string is set */
1274             if (rrd_test_error()){
1275                    free(step_start);
1276                    break;
1277             } 
1278             
1279         } /* endif a pdp_st has occurred */ 
1280         rrd.live_head->last_up = current_time;
1281         rrd.live_head->last_up_usec = current_time_usec; 
1282         free(step_start);
1283     } /* function argument loop */
1285     if (seasonal_coef != NULL) free(seasonal_coef);
1286     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1287         if (rra_step_cnt != NULL) free(rra_step_cnt);
1288     rpnstack_free(&rpnstack);
1290     /* if we got here and if there is an error and if the file has not been
1291      * written to, then close things up and return. */
1292     if (rrd_test_error()) {
1293         free(updvals);
1294         free(tmpl_idx);
1295         rrd_free(&rrd);
1296         free(pdp_temp);
1297         free(pdp_new);
1298         fclose(rrd_file);
1299         return(-1);
1300     }
1302     /* aargh ... that was tough ... so many loops ... anyway, its done.
1303      * we just need to write back the live header portion now*/
1305     if (fseek(rrd_file, (sizeof(stat_head_t)
1306                          + sizeof(ds_def_t)*rrd.stat_head->ds_cnt 
1307                          + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1308               SEEK_SET) != 0) {
1309         rrd_set_error("seek rrd for live header writeback");
1310         free(updvals);
1311         free(tmpl_idx);
1312         rrd_free(&rrd);
1313         free(pdp_temp);
1314         free(pdp_new);
1315         fclose(rrd_file);
1316         return(-1);
1317     }
1319     if(version >= 3) {
1320             if(fwrite( rrd.live_head,
1321                        sizeof(live_head_t), 1, rrd_file) != 1){
1322                 rrd_set_error("fwrite live_head to rrd");
1323                 free(updvals);
1324                 rrd_free(&rrd);
1325                 free(tmpl_idx);
1326                 free(pdp_temp);
1327                 free(pdp_new);
1328                 fclose(rrd_file);
1329                 return(-1);
1330             }
1331     }
1332     else {
1333             if(fwrite( &rrd.live_head->last_up,
1334                        sizeof(time_t), 1, rrd_file) != 1){
1335                 rrd_set_error("fwrite live_head to rrd");
1336                 free(updvals);
1337                 rrd_free(&rrd);
1338                 free(tmpl_idx);
1339                 free(pdp_temp);
1340                 free(pdp_new);
1341                 fclose(rrd_file);
1342                 return(-1);
1343             }
1344     }
1345             
1347     if(fwrite( rrd.pdp_prep,
1348                sizeof(pdp_prep_t),
1349                rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1350         rrd_set_error("ftwrite pdp_prep to rrd");
1351         free(updvals);
1352         rrd_free(&rrd);
1353         free(tmpl_idx);
1354         free(pdp_temp);
1355         free(pdp_new);
1356         fclose(rrd_file);
1357         return(-1);
1358     }
1360     if(fwrite( rrd.cdp_prep,
1361                sizeof(cdp_prep_t),
1362                rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file) 
1363        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1365         rrd_set_error("ftwrite cdp_prep to rrd");
1366         free(updvals);
1367         free(tmpl_idx);
1368         rrd_free(&rrd);
1369         free(pdp_temp);
1370         free(pdp_new);
1371         fclose(rrd_file);
1372         return(-1);
1373     }
1375     if(fwrite( rrd.rra_ptr,
1376                sizeof(rra_ptr_t), 
1377                rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1378         rrd_set_error("fwrite rra_ptr to rrd");
1379         free(updvals);
1380         free(tmpl_idx);
1381         rrd_free(&rrd);
1382         free(pdp_temp);
1383         free(pdp_new);
1384         fclose(rrd_file);
1385         return(-1);
1386     }
1388     /* OK now close the files and free the memory */
1389     if(fclose(rrd_file) != 0){
1390         rrd_set_error("closing rrd");
1391         free(updvals);
1392         free(tmpl_idx);
1393         rrd_free(&rrd);
1394         free(pdp_temp);
1395         free(pdp_new);
1396         return(-1);
1397     }
1399     /* calling the smoothing code here guarantees at most
1400          * one smoothing operation per rrd_update call. Unfortunately,
1401          * it is possible with bulk updates, or a long-delayed update
1402          * for smoothing to occur off-schedule. This really isn't
1403          * critical except during the burning cycles. */
1404         if (schedule_smooth)
1405         {
1406 #ifndef WIN32
1407           rrd_file = fopen(filename,"r+");
1408 #else
1409           rrd_file = fopen(filename,"rb+");
1410 #endif
1411           rra_start = rra_begin;
1412           for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1413           {
1414             if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1415                 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1416             {
1417 #ifdef DEBUG
1418               fprintf(stderr,"Running smoother for rra %ld\n",i);
1419 #endif
1420               apply_smoother(&rrd,i,rra_start,rrd_file);
1421               if (rrd_test_error())
1422                 break;
1423             }
1424             rra_start += rrd.rra_def[i].row_cnt
1425               *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1426           }
1427           fclose(rrd_file);
1428         }
1429     rrd_free(&rrd);
1430     free(updvals);
1431     free(tmpl_idx);
1432     free(pdp_new);
1433     free(pdp_temp);
1434     return(0);
1437 /*
1438  * get exclusive lock to whole file.
1439  * lock gets removed when we close the file
1440  *
1441  * returns 0 on success
1442  */
1443 int
1444 LockRRD(FILE *rrdfile)
1446     int rrd_fd;         /* File descriptor for RRD */
1447     int rcstat;
1449     rrd_fd = fileno(rrdfile);
1451         {
1452 #ifndef WIN32    
1453     struct flock        lock;
1454     lock.l_type = F_WRLCK;    /* exclusive write lock */
1455     lock.l_len = 0;           /* whole file */
1456     lock.l_start = 0;         /* start of file */
1457     lock.l_whence = SEEK_SET;   /* end of file */
1459     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1460 #else
1461     struct _stat st;
1463     if ( _fstat( rrd_fd, &st ) == 0 ) {
1464             rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1465     } else {
1466             rcstat = -1;
1467     }
1468 #endif
1469         }
1471     return(rcstat);
1475 info_t
1476 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1477                unsigned short CDP_scratch_idx, FILE *rrd_file,
1478                    info_t *pcdp_summary, time_t *rra_time)
1480    unsigned long ds_idx, cdp_idx;
1481    infoval iv;
1482   
1483    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1484    {
1485       /* compute the cdp index */
1486       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1487 #ifdef DEBUG
1488           fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1489              rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1490              rrd -> rra_def[rra_idx].cf_nam);
1491 #endif 
1492       if (pcdp_summary != NULL)
1493           {
1494              iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1495              /* append info to the return hash */
1496                  pcdp_summary = info_push(pcdp_summary,
1497                  sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1498                  *rra_time, rrd->rra_def[rra_idx].cf_nam, 
1499                  rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1500          RD_I_VAL, iv);
1501           }
1502           if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1503                  sizeof(rrd_value_t),1,rrd_file) != 1)
1504           { 
1505              rrd_set_error("writing rrd");
1506              return 0;
1507           }
1508           *rra_current += sizeof(rrd_value_t);
1509         }
1510         return (pcdp_summary);