1 /*****************************************************************************
2 * RRDtool 1.2.26 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id: rrd_update.c 1235 2007-11-20 00:15:07Z oetiker $
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
91 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99 char *tmplt = NULL;
100 info_t *result = NULL;
101 infoval rc;
102 rc.u_int = -1;
103 optind = 0; opterr = 0; /* initialize getopt */
105 while (1) {
106 static struct option long_options[] =
107 {
108 {"template", required_argument, 0, 't'},
109 {0,0,0,0}
110 };
111 int option_index = 0;
112 int opt;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
116 if (opt == EOF)
117 break;
119 switch(opt) {
120 case 't':
121 tmplt = optarg;
122 break;
124 case '?':
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
126 goto end_tag;
127 }
128 }
130 /* need at least 2 arguments: filename, data. */
131 if (argc-optind < 2) {
132 rrd_set_error("Not enough arguments");
133 goto end_tag;
134 }
135 rc.u_int = 0;
136 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137 rc.u_int = _rrd_update(argv[optind], tmplt,
138 argc - optind - 1, (const char **)(argv + optind + 1), result);
139 result->value.u_int = rc.u_int;
140 end_tag:
141 return result;
142 }
144 int
145 rrd_update(int argc, char **argv)
146 {
147 char *tmplt = NULL;
148 int rc;
149 optind = 0; opterr = 0; /* initialize getopt */
151 while (1) {
152 static struct option long_options[] =
153 {
154 {"template", required_argument, 0, 't'},
155 {0,0,0,0}
156 };
157 int option_index = 0;
158 int opt;
159 opt = getopt_long(argc, argv, "t:",
160 long_options, &option_index);
162 if (opt == EOF)
163 break;
165 switch(opt) {
166 case 't':
167 tmplt = optarg;
168 break;
170 case '?':
171 rrd_set_error("unknown option '%s'",argv[optind-1]);
172 return(-1);
173 }
174 }
176 /* need at least 2 arguments: filename, data. */
177 if (argc-optind < 2) {
178 rrd_set_error("Not enough arguments");
180 return -1;
181 }
183 rc = rrd_update_r(argv[optind], tmplt,
184 argc - optind - 1, (const char **)(argv + optind + 1));
185 return rc;
186 }
188 int
189 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
190 {
191 return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
194 int
195 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
196 info_t *pcdp_summary)
197 {
199 int arg_i = 2;
200 short j;
201 unsigned long i,ii,iii=1;
203 unsigned long rra_begin; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer never changes value */
206 unsigned long rra_start; /* byte pointer to the rra
207 * area in the rrd file. this
208 * pointer changes as each rrd is
209 * processed. */
210 unsigned long rra_current; /* byte pointer to the current write
211 * spot in the rrd file. */
212 unsigned long rra_pos_tmp; /* temporary byte pointer. */
213 double interval,
214 pre_int,post_int; /* interval between this and
215 * the last run */
216 unsigned long proc_pdp_st; /* which pdp_st was the last
217 * to be processed */
218 unsigned long occu_pdp_st; /* when was the pdp_st
219 * before the last update
220 * time */
221 unsigned long proc_pdp_age; /* how old was the data in
222 * the pdp prep area when it
223 * was last updated */
224 unsigned long occu_pdp_age; /* how long ago was the last
225 * pdp_step time */
226 rrd_value_t *pdp_new; /* prepare the incoming data
227 * to be added the the
228 * existing entry */
229 rrd_value_t *pdp_temp; /* prepare the pdp values
230 * to be added the the
231 * cdp values */
233 long *tmpl_idx; /* index representing the settings
234 transported by the tmplt index */
235 unsigned long tmpl_cnt = 2; /* time and data */
237 FILE *rrd_file;
238 rrd_t rrd;
239 time_t current_time = 0;
240 time_t rra_time = 0; /* time of update for a RRA */
241 unsigned long current_time_usec=0;/* microseconds part of current time */
242 struct timeval tmp_time; /* used for time conversion */
244 char **updvals;
245 int schedule_smooth = 0;
246 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247 /* a vector of future Holt-Winters seasonal coefs */
248 unsigned long elapsed_pdp_st;
249 /* number of elapsed PDP steps since last update */
250 unsigned long *rra_step_cnt = NULL;
251 /* number of rows to be updated in an RRA for a data
252 * value. */
253 unsigned long start_pdp_offset;
254 /* number of PDP steps since the last update that
255 * are assigned to the first CDP to be generated
256 * since the last update. */
257 unsigned short scratch_idx;
258 /* index into the CDP scratch array */
259 enum cf_en current_cf;
260 /* numeric id of the current consolidation function */
261 rpnstack_t rpnstack; /* used for COMPUTE DS */
262 int version; /* rrd version */
263 char *endptr; /* used in the conversion */
265 #ifdef HAVE_MMAP
266 void *rrd_mmaped_file;
267 unsigned long rrd_filesize;
268 #endif
271 rpnstack_init(&rpnstack);
273 /* need at least 1 arguments: data. */
274 if (argc < 1) {
275 rrd_set_error("Not enough arguments");
276 return -1;
277 }
281 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
282 return -1;
283 }
285 /* initialize time */
286 version = atoi(rrd.stat_head->version);
287 gettimeofday(&tmp_time, 0);
288 normalize_time(&tmp_time);
289 current_time = tmp_time.tv_sec;
290 if(version >= 3) {
291 current_time_usec = tmp_time.tv_usec;
292 }
293 else {
294 current_time_usec = 0;
295 }
297 rra_current = rra_start = rra_begin = ftell(rrd_file);
298 /* This is defined in the ANSI C standard, section 7.9.5.3:
300 When a file is opened with udpate mode ('+' as the second
301 or third character in the ... list of mode argument
302 variables), both input and ouptut may be performed on the
303 associated stream. However, ... input may not be directly
304 followed by output without an intervening call to a file
305 positioning function, unless the input oepration encounters
306 end-of-file. */
307 #ifdef HAVE_MMAP
308 fseek(rrd_file, 0, SEEK_END);
309 rrd_filesize = ftell(rrd_file);
310 fseek(rrd_file, rra_current, SEEK_SET);
311 #else
312 fseek(rrd_file, 0, SEEK_CUR);
313 #endif
316 /* get exclusive lock to whole file.
317 * lock gets removed when we close the file.
318 */
319 if (LockRRD(rrd_file) != 0) {
320 rrd_set_error("could not lock RRD");
321 rrd_free(&rrd);
322 fclose(rrd_file);
323 return(-1);
324 }
326 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
327 rrd_set_error("allocating updvals pointer array");
328 rrd_free(&rrd);
329 fclose(rrd_file);
330 return(-1);
331 }
333 if ((pdp_temp = malloc(sizeof(rrd_value_t)
334 *rrd.stat_head->ds_cnt))==NULL){
335 rrd_set_error("allocating pdp_temp ...");
336 free(updvals);
337 rrd_free(&rrd);
338 fclose(rrd_file);
339 return(-1);
340 }
342 if ((tmpl_idx = malloc(sizeof(unsigned long)
343 *(rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating tmpl_idx ...");
345 free(pdp_temp);
346 free(updvals);
347 rrd_free(&rrd);
348 fclose(rrd_file);
349 return(-1);
350 }
351 /* initialize tmplt redirector */
352 /* default config example (assume DS 1 is a CDEF DS)
353 tmpl_idx[0] -> 0; (time)
354 tmpl_idx[1] -> 1; (DS 0)
355 tmpl_idx[2] -> 3; (DS 2)
356 tmpl_idx[3] -> 4; (DS 3) */
357 tmpl_idx[0] = 0; /* time */
358 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
359 {
360 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
361 tmpl_idx[ii++]=i;
362 }
363 tmpl_cnt= ii;
365 if (tmplt) {
366 /* we should work on a writeable copy here */
367 char *dsname;
368 unsigned int tmpl_len;
369 char *tmplt_copy = strdup(tmplt);
370 dsname = tmplt_copy;
371 tmpl_cnt = 1; /* the first entry is the time */
372 tmpl_len = strlen(tmplt_copy);
373 for(i=0;i<=tmpl_len ;i++) {
374 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
375 tmplt_copy[i] = '\0';
376 if (tmpl_cnt>rrd.stat_head->ds_cnt){
377 rrd_set_error("tmplt contains more DS definitions than RRD");
378 free(updvals); free(pdp_temp);
379 free(tmpl_idx); rrd_free(&rrd);
380 fclose(rrd_file); return(-1);
381 }
382 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
383 rrd_set_error("unknown DS name '%s'",dsname);
384 free(updvals); free(pdp_temp);
385 free(tmplt_copy);
386 free(tmpl_idx); rrd_free(&rrd);
387 fclose(rrd_file); return(-1);
388 } else {
389 /* the first element is always the time */
390 tmpl_idx[tmpl_cnt-1]++;
391 /* go to the next entry on the tmplt_copy */
392 dsname = &tmplt_copy[i+1];
393 /* fix the damage we did before */
394 if (i<tmpl_len) {
395 tmplt_copy[i]=':';
396 }
398 }
399 }
400 }
401 free(tmplt_copy);
402 }
403 if ((pdp_new = malloc(sizeof(rrd_value_t)
404 *rrd.stat_head->ds_cnt))==NULL){
405 rrd_set_error("allocating pdp_new ...");
406 free(updvals);
407 free(pdp_temp);
408 free(tmpl_idx);
409 rrd_free(&rrd);
410 fclose(rrd_file);
411 return(-1);
412 }
414 #ifdef HAVE_MMAP
415 rrd_mmaped_file = mmap(0,
416 rrd_filesize,
417 PROT_READ | PROT_WRITE,
418 MAP_SHARED,
419 fileno(rrd_file),
420 0);
421 if (rrd_mmaped_file == MAP_FAILED) {
422 rrd_set_error("error mmapping file %s", filename);
423 free(updvals);
424 free(pdp_temp);
425 free(tmpl_idx);
426 rrd_free(&rrd);
427 fclose(rrd_file);
428 return(-1);
429 }
430 #ifdef HAVE_MADVISE
431 /* when we use mmaping we tell the kernel the mmap equivalent
432 of POSIX_FADV_RANDOM */
433 madvise(rrd_mmaped_file,rrd_filesize,POSIX_MADV_RANDOM);
434 #endif
435 #endif
436 /* loop through the arguments. */
437 for(arg_i=0; arg_i<argc;arg_i++) {
438 char *stepper = strdup(argv[arg_i]);
439 char *step_start = stepper;
440 char *p;
441 char *parsetime_error = NULL;
442 enum {atstyle, normal} timesyntax;
443 struct rrd_time_value ds_tv;
444 if (stepper == NULL){
445 rrd_set_error("failed duplication argv entry");
446 free(step_start);
447 free(updvals);
448 free(pdp_temp);
449 free(tmpl_idx);
450 rrd_free(&rrd);
451 #ifdef HAVE_MMAP
452 munmap(rrd_mmaped_file, rrd_filesize);
453 #endif
454 fclose(rrd_file);
455 return(-1);
456 }
457 /* initialize all ds input to unknown except the first one
458 which has always got to be set */
459 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
460 updvals[0]=stepper;
461 /* separate all ds elements; first must be examined separately
462 due to alternate time syntax */
463 if ((p=strchr(stepper,'@'))!=NULL) {
464 timesyntax = atstyle;
465 *p = '\0';
466 stepper = p+1;
467 } else if ((p=strchr(stepper,':'))!=NULL) {
468 timesyntax = normal;
469 *p = '\0';
470 stepper = p+1;
471 } else {
472 rrd_set_error("expected timestamp not found in data source from %s",
473 argv[arg_i]);
474 free(step_start);
475 break;
476 }
477 ii=1;
478 updvals[tmpl_idx[ii]] = stepper;
479 while (*stepper) {
480 if (*stepper == ':') {
481 *stepper = '\0';
482 ii++;
483 if (ii<tmpl_cnt){
484 updvals[tmpl_idx[ii]] = stepper+1;
485 }
486 }
487 stepper++;
488 }
490 if (ii != tmpl_cnt-1) {
491 rrd_set_error("expected %lu data source readings (got %lu) from %s",
492 tmpl_cnt-1, ii, argv[arg_i]);
493 free(step_start);
494 break;
495 }
497 /* get the time from the reading ... handle N */
498 if (timesyntax == atstyle) {
499 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
500 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
501 free(step_start);
502 break;
503 }
504 if (ds_tv.type == RELATIVE_TO_END_TIME ||
505 ds_tv.type == RELATIVE_TO_START_TIME) {
506 rrd_set_error("specifying time relative to the 'start' "
507 "or 'end' makes no sense here: %s",
508 updvals[0]);
509 free(step_start);
510 break;
511 }
513 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
514 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
516 } else if (strcmp(updvals[0],"N")==0){
517 gettimeofday(&tmp_time, 0);
518 normalize_time(&tmp_time);
519 current_time = tmp_time.tv_sec;
520 current_time_usec = tmp_time.tv_usec;
521 } else {
522 double tmp;
523 tmp = strtod(updvals[0], 0);
524 current_time = floor(tmp);
525 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
526 }
527 /* dont do any correction for old version RRDs */
528 if(version < 3)
529 current_time_usec = 0;
531 if(current_time < rrd.live_head->last_up ||
532 (current_time == rrd.live_head->last_up &&
533 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
534 rrd_set_error("illegal attempt to update using time %ld when "
535 "last update time is %ld (minimum one second step)",
536 current_time, rrd.live_head->last_up);
537 free(step_start);
538 break;
539 }
542 /* seek to the beginning of the rra's */
543 if (rra_current != rra_begin) {
544 #ifndef HAVE_MMAP
545 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
546 rrd_set_error("seek error in rrd");
547 free(step_start);
548 break;
549 }
550 #endif
551 rra_current = rra_begin;
552 }
553 rra_start = rra_begin;
555 /* when was the current pdp started */
556 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
557 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
559 /* when did the last pdp_st occur */
560 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
561 occu_pdp_st = current_time - occu_pdp_age;
563 /* interval = current_time - rrd.live_head->last_up; */
564 interval = (double)(current_time - rrd.live_head->last_up)
565 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
567 if (occu_pdp_st > proc_pdp_st){
568 /* OK we passed the pdp_st moment*/
569 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
570 * occurred before the latest
571 * pdp_st moment*/
572 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
573 post_int = occu_pdp_age; /* how much after it */
574 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
575 } else {
576 pre_int = interval;
577 post_int = 0;
578 }
580 #ifdef DEBUG
581 printf(
582 "proc_pdp_age %lu\t"
583 "proc_pdp_st %lu\t"
584 "occu_pfp_age %lu\t"
585 "occu_pdp_st %lu\t"
586 "int %lf\t"
587 "pre_int %lf\t"
588 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
589 occu_pdp_age, occu_pdp_st,
590 interval, pre_int, post_int);
591 #endif
593 /* process the data sources and update the pdp_prep
594 * area accordingly */
595 for(i=0;i<rrd.stat_head->ds_cnt;i++){
596 enum dst_en dst_idx;
597 dst_idx= dst_conv(rrd.ds_def[i].dst);
599 /* make sure we do not build diffs with old last_ds values */
600 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
601 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
602 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
603 }
605 /* NOTE: DST_CDEF should never enter this if block, because
606 * updvals[i+1][0] is initialized to 'U'; unless the caller
607 * accidently specified a value for the DST_CDEF. To handle
608 * this case, an extra check is required. */
610 if((updvals[i+1][0] != 'U') &&
611 (dst_idx != DST_CDEF) &&
612 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
613 double rate = DNAN;
614 /* the data source type defines how to process the data */
615 /* pdp_new contains rate * time ... eg the bytes
616 * transferred during the interval. Doing it this way saves
617 * a lot of math operations */
620 switch(dst_idx){
621 case DST_COUNTER:
622 case DST_DERIVE:
623 for(ii=0;updvals[i+1][ii] != '\0';ii++){
624 if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
625 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
626 break;
627 }
628 }
629 if (rrd_test_error()){
630 break;
631 }
632 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
633 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
634 if(dst_idx == DST_COUNTER) {
635 /* simple overflow catcher suggested by Andres Kroonmaa */
636 /* this will fail terribly for non 32 or 64 bit counters ... */
637 /* are there any others in SNMP land ? */
638 if (pdp_new[i] < (double)0.0 )
639 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
640 if (pdp_new[i] < (double)0.0 )
641 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
642 }
643 rate = pdp_new[i] / interval;
644 }
645 else {
646 pdp_new[i]= DNAN;
647 }
648 break;
649 case DST_ABSOLUTE:
650 errno = 0;
651 pdp_new[i] = strtod(updvals[i+1],&endptr);
652 if (errno > 0){
653 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
654 break;
655 };
656 if (endptr[0] != '\0'){
657 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
658 break;
659 }
660 rate = pdp_new[i] / interval;
661 break;
662 case DST_GAUGE:
663 errno = 0;
664 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
665 if (errno > 0){
666 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
667 break;
668 };
669 if (endptr[0] != '\0'){
670 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
671 break;
672 }
673 rate = pdp_new[i] / interval;
674 break;
675 default:
676 rrd_set_error("rrd contains unknown DS type : '%s'",
677 rrd.ds_def[i].dst);
678 break;
679 }
680 /* break out of this for loop if the error string is set */
681 if (rrd_test_error()){
682 break;
683 }
684 /* make sure pdp_temp is neither too large or too small
685 * if any of these occur it becomes unknown ...
686 * sorry folks ... */
687 if ( ! isnan(rate) &&
688 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
689 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
690 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
691 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
692 pdp_new[i] = DNAN;
693 }
694 } else {
695 /* no news is news all the same */
696 pdp_new[i] = DNAN;
697 }
700 /* make a copy of the command line argument for the next run */
701 #ifdef DEBUG
702 fprintf(stderr,
703 "prep ds[%lu]\t"
704 "last_arg '%s'\t"
705 "this_arg '%s'\t"
706 "pdp_new %10.2f\n",
707 i,
708 rrd.pdp_prep[i].last_ds,
709 updvals[i+1], pdp_new[i]);
710 #endif
711 strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
712 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
713 }
714 /* break out of the argument parsing loop if the error_string is set */
715 if (rrd_test_error()){
716 free(step_start);
717 break;
718 }
719 /* has a pdp_st moment occurred since the last run ? */
721 if (proc_pdp_st == occu_pdp_st){
722 /* no we have not passed a pdp_st moment. therefore update is simple */
724 for(i=0;i<rrd.stat_head->ds_cnt;i++){
725 if(isnan(pdp_new[i])) {
726 /* this is not realy accurate if we use subsecond data arival time
727 should have thought of it when going subsecond resolution ...
728 sorry next format change we will have it! */
729 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
730 } else {
731 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
732 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
733 } else {
734 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
735 }
736 }
737 #ifdef DEBUG
738 fprintf(stderr,
739 "NO PDP ds[%lu]\t"
740 "value %10.2f\t"
741 "unkn_sec %5lu\n",
742 i,
743 rrd.pdp_prep[i].scratch[PDP_val].u_val,
744 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
745 #endif
746 }
747 } else {
748 /* an pdp_st has occurred. */
750 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
751 * occurred up to the last run.
752 pdp_new[] contains rate*seconds from the latest run.
753 pdp_temp[] will contain the rate for cdp */
755 for(i=0;i<rrd.stat_head->ds_cnt;i++){
756 /* update pdp_prep to the current pdp_st. */
757 double pre_unknown = 0.0;
758 if(isnan(pdp_new[i]))
759 /* a final bit of unkonwn to be added bevore calculation
760 * we use a tempaorary variable for this so that we
761 * don't have to turn integer lines before using the value */
762 pre_unknown = pre_int;
763 else {
764 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
765 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
766 } else {
767 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
768 }
769 }
772 /* if too much of the pdp_prep is unknown we dump it */
773 if (
774 /* removed because this does not agree with the definition
775 a heart beat can be unknown */
776 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
777 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
778 /* if the interval is larger thatn mrhb we get NAN */
779 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
780 (occu_pdp_st-proc_pdp_st <=
781 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
782 pdp_temp[i] = DNAN;
783 } else {
784 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
785 / ((double)(occu_pdp_st - proc_pdp_st
786 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
787 -pre_unknown);
788 }
790 /* process CDEF data sources; remember each CDEF DS can
791 * only reference other DS with a lower index number */
792 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
793 rpnp_t *rpnp;
794 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
795 /* substitue data values for OP_VARIABLE nodes */
796 for (ii = 0; rpnp[ii].op != OP_END; ii++)
797 {
798 if (rpnp[ii].op == OP_VARIABLE) {
799 rpnp[ii].op = OP_NUMBER;
800 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
801 }
802 }
803 /* run the rpn calculator */
804 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
805 free(rpnp);
806 break; /* exits the data sources pdp_temp loop */
807 }
808 }
810 /* make pdp_prep ready for the next run */
811 if(isnan(pdp_new[i])){
812 /* this is not realy accurate if we use subsecond data arival time
813 should have thought of it when going subsecond resolution ...
814 sorry next format change we will have it! */
815 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
816 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
817 } else {
818 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
819 rrd.pdp_prep[i].scratch[PDP_val].u_val =
820 pdp_new[i]/interval*post_int;
821 }
823 #ifdef DEBUG
824 fprintf(stderr,
825 "PDP UPD ds[%lu]\t"
826 "pdp_temp %10.2f\t"
827 "new_prep %10.2f\t"
828 "new_unkn_sec %5lu\n",
829 i, pdp_temp[i],
830 rrd.pdp_prep[i].scratch[PDP_val].u_val,
831 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
832 #endif
833 }
835 /* if there were errors during the last loop, bail out here */
836 if (rrd_test_error()){
837 free(step_start);
838 break;
839 }
841 /* compute the number of elapsed pdp_st moments */
842 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
843 #ifdef DEBUG
844 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
845 #endif
846 if (rra_step_cnt == NULL)
847 {
848 rra_step_cnt = (unsigned long *)
849 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
850 }
852 for(i = 0, rra_start = rra_begin;
853 i < rrd.stat_head->rra_cnt;
854 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
855 i++)
856 {
857 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
858 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
859 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
860 if (start_pdp_offset <= elapsed_pdp_st) {
861 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
862 rrd.rra_def[i].pdp_cnt + 1;
863 } else {
864 rra_step_cnt[i] = 0;
865 }
867 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
868 {
869 /* If this is a bulk update, we need to skip ahead in the seasonal
870 * arrays so that they will be correct for the next observed value;
871 * note that for the bulk update itself, no update will occur to
872 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
873 * be set to DNAN. */
874 if (rra_step_cnt[i] > 2)
875 {
876 /* skip update by resetting rra_step_cnt[i],
877 * note that this is not data source specific; this is due
878 * to the bulk update, not a DNAN value for the specific data
879 * source. */
880 rra_step_cnt[i] = 0;
881 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
882 &last_seasonal_coef);
883 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
884 &seasonal_coef);
885 }
887 /* periodically run a smoother for seasonal effects */
888 /* Need to use first cdp parameter buffer to track
889 * burnin (burnin requires a specific smoothing schedule).
890 * The CDP_init_seasonal parameter is really an RRA level,
891 * not a data source within RRA level parameter, but the rra_def
892 * is read only for rrd_update (not flushed to disk). */
893 iii = i*(rrd.stat_head -> ds_cnt);
894 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
895 <= BURNIN_CYCLES)
896 {
897 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
898 > rrd.rra_def[i].row_cnt - 1) {
899 /* mark off one of the burnin cycles */
900 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
901 schedule_smooth = 1;
902 }
903 } else {
904 /* someone has no doubt invented a trick to deal with this
905 * wrap around, but at least this code is clear. */
906 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
907 rrd.rra_ptr[i].cur_row)
908 {
909 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
910 * mapping between PDP and CDP */
911 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
912 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
913 {
914 #ifdef DEBUG
915 fprintf(stderr,
916 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
917 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
918 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
919 #endif
920 schedule_smooth = 1;
921 }
922 } else {
923 /* can't rely on negative numbers because we are working with
924 * unsigned values */
925 /* Don't need modulus here. If we've wrapped more than once, only
926 * one smooth is executed at the end. */
927 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
928 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
929 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
930 {
931 #ifdef DEBUG
932 fprintf(stderr,
933 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
934 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
935 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
936 #endif
937 schedule_smooth = 1;
938 }
939 }
940 }
942 rra_current = ftell(rrd_file);
943 } /* if cf is DEVSEASONAL or SEASONAL */
945 if (rrd_test_error()) break;
947 /* update CDP_PREP areas */
948 /* loop over data soures within each RRA */
949 for(ii = 0;
950 ii < rrd.stat_head->ds_cnt;
951 ii++)
952 {
954 /* iii indexes the CDP prep area for this data source within the RRA */
955 iii=i*rrd.stat_head->ds_cnt+ii;
957 if (rrd.rra_def[i].pdp_cnt > 1) {
959 if (rra_step_cnt[i] > 0) {
960 /* If we are in this block, as least 1 CDP value will be written to
961 * disk, this is the CDP_primary_val entry. If more than 1 value needs
962 * to be written, then the "fill in" value is the CDP_secondary_val
963 * entry. */
964 if (isnan(pdp_temp[ii]))
965 {
966 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
967 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
968 } else {
969 /* CDP_secondary value is the RRA "fill in" value for intermediary
970 * CDP data entries. No matter the CF, the value is the same because
971 * the average, max, min, and last of a list of identical values is
972 * the same, namely, the value itself. */
973 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
974 }
976 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
977 > rrd.rra_def[i].pdp_cnt*
978 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
979 {
980 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
981 /* initialize carry over */
982 if (current_cf == CF_AVERAGE) {
983 if (isnan(pdp_temp[ii])) {
984 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
985 } else {
986 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
987 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
988 }
989 } else {
990 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
991 }
992 } else {
993 rrd_value_t cum_val, cur_val;
994 switch (current_cf) {
995 case CF_AVERAGE:
996 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
997 cur_val = IFDNAN(pdp_temp[ii],0.0);
998 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
999 (cum_val + cur_val * start_pdp_offset) /
1000 (rrd.rra_def[i].pdp_cnt
1001 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1002 /* initialize carry over value */
1003 if (isnan(pdp_temp[ii])) {
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1005 } else {
1006 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1007 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1008 }
1009 break;
1010 case CF_MAXIMUM:
1011 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1012 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1013 #ifdef DEBUG
1014 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1015 isnan(pdp_temp[ii])) {
1016 fprintf(stderr,
1017 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1018 i,ii);
1019 exit(-1);
1020 }
1021 #endif
1022 if (cur_val > cum_val)
1023 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1024 else
1025 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1026 /* initialize carry over value */
1027 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1028 break;
1029 case CF_MINIMUM:
1030 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1031 cur_val = IFDNAN(pdp_temp[ii],DINF);
1032 #ifdef DEBUG
1033 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1034 isnan(pdp_temp[ii])) {
1035 fprintf(stderr,
1036 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1037 i,ii);
1038 exit(-1);
1039 }
1040 #endif
1041 if (cur_val < cum_val)
1042 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1043 else
1044 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1045 /* initialize carry over value */
1046 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1047 break;
1048 case CF_LAST:
1049 default:
1050 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1051 /* initialize carry over value */
1052 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1053 break;
1054 }
1055 } /* endif meets xff value requirement for a valid value */
1056 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1057 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1058 if (isnan(pdp_temp[ii]))
1059 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1060 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1061 else
1062 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1063 } else /* rra_step_cnt[i] == 0 */
1064 {
1065 #ifdef DEBUG
1066 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1067 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1068 i,ii);
1069 } else {
1070 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1071 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1072 }
1073 #endif
1074 if (isnan(pdp_temp[ii])) {
1075 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1076 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1077 {
1078 if (current_cf == CF_AVERAGE) {
1079 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1080 elapsed_pdp_st;
1081 } else {
1082 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1083 }
1084 #ifdef DEBUG
1085 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1086 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1087 #endif
1088 } else {
1089 switch (current_cf) {
1090 case CF_AVERAGE:
1091 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1092 elapsed_pdp_st;
1093 break;
1094 case CF_MINIMUM:
1095 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1097 break;
1098 case CF_MAXIMUM:
1099 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1100 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1101 break;
1102 case CF_LAST:
1103 default:
1104 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1105 break;
1106 }
1107 }
1108 }
1109 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1110 if (elapsed_pdp_st > 2)
1111 {
1112 switch (current_cf) {
1113 case CF_AVERAGE:
1114 default:
1115 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1116 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1117 break;
1118 case CF_SEASONAL:
1119 case CF_DEVSEASONAL:
1120 /* need to update cached seasonal values, so they are consistent
1121 * with the bulk update */
1122 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1123 * CDP_last_deviation are the same. */
1124 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1125 last_seasonal_coef[ii];
1126 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1127 seasonal_coef[ii];
1128 break;
1129 case CF_HWPREDICT:
1130 /* need to update the null_count and last_null_count.
1131 * even do this for non-DNAN pdp_temp because the
1132 * algorithm is not learning from batch updates. */
1133 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1134 elapsed_pdp_st;
1135 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1136 elapsed_pdp_st - 1;
1137 /* fall through */
1138 case CF_DEVPREDICT:
1139 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1140 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1141 break;
1142 case CF_FAILURES:
1143 /* do not count missed bulk values as failures */
1144 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1145 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1146 /* need to reset violations buffer.
1147 * could do this more carefully, but for now, just
1148 * assume a bulk update wipes away all violations. */
1149 erase_violations(&rrd, iii, i);
1150 break;
1151 }
1152 }
1153 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1155 if (rrd_test_error()) break;
1157 } /* endif data sources loop */
1158 } /* end RRA Loop */
1160 /* this loop is only entered if elapsed_pdp_st < 3 */
1161 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1162 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1163 {
1164 for(i = 0, rra_start = rra_begin;
1165 i < rrd.stat_head->rra_cnt;
1166 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1167 i++)
1168 {
1169 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1171 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1172 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1173 {
1174 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1175 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1176 &seasonal_coef);
1177 rra_current = ftell(rrd_file);
1178 }
1179 if (rrd_test_error()) break;
1180 /* loop over data soures within each RRA */
1181 for(ii = 0;
1182 ii < rrd.stat_head->ds_cnt;
1183 ii++)
1184 {
1185 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1186 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1187 scratch_idx, seasonal_coef);
1188 }
1189 } /* end RRA Loop */
1190 if (rrd_test_error()) break;
1191 } /* end elapsed_pdp_st loop */
1193 if (rrd_test_error()) break;
1195 /* Ready to write to disk */
1196 /* Move sequentially through the file, writing one RRA at a time.
1197 * Note this architecture divorces the computation of CDP with
1198 * flushing updated RRA entries to disk. */
1199 for(i = 0, rra_start = rra_begin;
1200 i < rrd.stat_head->rra_cnt;
1201 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1202 i++) {
1203 /* is th5Aere anything to write for this RRA? If not, continue. */
1204 if (rra_step_cnt[i] == 0) continue;
1206 /* write the first row */
1207 #ifdef DEBUG
1208 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1209 #endif
1210 rrd.rra_ptr[i].cur_row++;
1211 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1212 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1213 /* positition on the first row */
1214 rra_pos_tmp = rra_start +
1215 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1216 if(rra_pos_tmp != rra_current) {
1217 #ifndef HAVE_MMAP
1218 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1219 rrd_set_error("seek error in rrd");
1220 break;
1221 }
1222 #endif
1223 rra_current = rra_pos_tmp;
1224 }
1226 #ifdef DEBUG
1227 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1228 #endif
1229 scratch_idx = CDP_primary_val;
1230 if (pcdp_summary != NULL)
1231 {
1232 rra_time = (current_time - current_time
1233 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1234 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1235 }
1236 #ifdef HAVE_MMAP
1237 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1238 pcdp_summary, &rra_time, rrd_mmaped_file);
1239 #else
1240 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1241 pcdp_summary, &rra_time);
1242 #endif
1243 if (rrd_test_error()) break;
1245 /* write other rows of the bulk update, if any */
1246 scratch_idx = CDP_secondary_val;
1247 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1248 {
1249 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1250 {
1251 #ifdef DEBUG
1252 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1253 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1254 #endif
1255 /* wrap */
1256 rrd.rra_ptr[i].cur_row = 0;
1257 /* seek back to beginning of current rra */
1258 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1259 {
1260 rrd_set_error("seek error in rrd");
1261 break;
1262 }
1263 #ifdef DEBUG
1264 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1265 #endif
1266 rra_current = rra_start;
1267 }
1268 if (pcdp_summary != NULL)
1269 {
1270 rra_time = (current_time - current_time
1271 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1272 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1273 }
1274 #ifdef HAVE_MMAP
1275 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1276 pcdp_summary, &rra_time, rrd_mmaped_file);
1277 #else
1278 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1279 pcdp_summary, &rra_time);
1280 #endif
1281 }
1283 if (rrd_test_error())
1284 break;
1285 } /* RRA LOOP */
1287 /* break out of the argument parsing loop if error_string is set */
1288 if (rrd_test_error()){
1289 free(step_start);
1290 break;
1291 }
1293 } /* endif a pdp_st has occurred */
1294 rrd.live_head->last_up = current_time;
1295 rrd.live_head->last_up_usec = current_time_usec;
1296 free(step_start);
1297 } /* function argument loop */
1299 if (seasonal_coef != NULL) free(seasonal_coef);
1300 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1301 if (rra_step_cnt != NULL) free(rra_step_cnt);
1302 rpnstack_free(&rpnstack);
1304 #ifdef HAVE_MMAP
1305 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1306 rrd_set_error("error writing(unmapping) file: %s", filename);
1307 }
1308 #endif
1309 /* if we got here and if there is an error and if the file has not been
1310 * written to, then close things up and return. */
1311 if (rrd_test_error()) {
1312 free(updvals);
1313 free(tmpl_idx);
1314 rrd_free(&rrd);
1315 free(pdp_temp);
1316 free(pdp_new);
1317 fclose(rrd_file);
1318 return(-1);
1319 }
1321 /* aargh ... that was tough ... so many loops ... anyway, its done.
1322 * we just need to write back the live header portion now*/
1324 if (fseek(rrd_file, (sizeof(stat_head_t)
1325 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1326 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1327 SEEK_SET) != 0) {
1328 rrd_set_error("seek rrd for live header writeback");
1329 free(updvals);
1330 free(tmpl_idx);
1331 rrd_free(&rrd);
1332 free(pdp_temp);
1333 free(pdp_new);
1334 fclose(rrd_file);
1335 return(-1);
1336 }
1338 if(version >= 3) {
1339 if(fwrite( rrd.live_head,
1340 sizeof(live_head_t), 1, rrd_file) != 1){
1341 rrd_set_error("fwrite live_head to rrd");
1342 free(updvals);
1343 rrd_free(&rrd);
1344 free(tmpl_idx);
1345 free(pdp_temp);
1346 free(pdp_new);
1347 fclose(rrd_file);
1348 return(-1);
1349 }
1350 }
1351 else {
1352 if(fwrite( &rrd.live_head->last_up,
1353 sizeof(time_t), 1, rrd_file) != 1){
1354 rrd_set_error("fwrite live_head to rrd");
1355 free(updvals);
1356 rrd_free(&rrd);
1357 free(tmpl_idx);
1358 free(pdp_temp);
1359 free(pdp_new);
1360 fclose(rrd_file);
1361 return(-1);
1362 }
1363 }
1366 if(fwrite( rrd.pdp_prep,
1367 sizeof(pdp_prep_t),
1368 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1369 rrd_set_error("ftwrite pdp_prep to rrd");
1370 free(updvals);
1371 rrd_free(&rrd);
1372 free(tmpl_idx);
1373 free(pdp_temp);
1374 free(pdp_new);
1375 fclose(rrd_file);
1376 return(-1);
1377 }
1379 if(fwrite( rrd.cdp_prep,
1380 sizeof(cdp_prep_t),
1381 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1382 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1384 rrd_set_error("ftwrite cdp_prep to rrd");
1385 free(updvals);
1386 free(tmpl_idx);
1387 rrd_free(&rrd);
1388 free(pdp_temp);
1389 free(pdp_new);
1390 fclose(rrd_file);
1391 return(-1);
1392 }
1394 if(fwrite( rrd.rra_ptr,
1395 sizeof(rra_ptr_t),
1396 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1397 rrd_set_error("fwrite rra_ptr to rrd");
1398 free(updvals);
1399 free(tmpl_idx);
1400 rrd_free(&rrd);
1401 free(pdp_temp);
1402 free(pdp_new);
1403 fclose(rrd_file);
1404 return(-1);
1405 }
1407 /* OK now close the files and free the memory */
1408 if(fclose(rrd_file) != 0){
1409 rrd_set_error("closing rrd");
1410 free(updvals);
1411 free(tmpl_idx);
1412 rrd_free(&rrd);
1413 free(pdp_temp);
1414 free(pdp_new);
1415 return(-1);
1416 }
1418 /* calling the smoothing code here guarantees at most
1419 * one smoothing operation per rrd_update call. Unfortunately,
1420 * it is possible with bulk updates, or a long-delayed update
1421 * for smoothing to occur off-schedule. This really isn't
1422 * critical except during the burning cycles. */
1423 if (schedule_smooth)
1424 {
1425 rrd_file = fopen(filename,"rb+");
1428 rra_start = rra_begin;
1429 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1430 {
1431 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1432 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1433 {
1434 #ifdef DEBUG
1435 fprintf(stderr,"Running smoother for rra %ld\n",i);
1436 #endif
1437 apply_smoother(&rrd,i,rra_start,rrd_file);
1438 if (rrd_test_error())
1439 break;
1440 }
1441 rra_start += rrd.rra_def[i].row_cnt
1442 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1443 }
1444 fclose(rrd_file);
1445 }
1446 rrd_free(&rrd);
1447 free(updvals);
1448 free(tmpl_idx);
1449 free(pdp_new);
1450 free(pdp_temp);
1451 return(0);
1452 }
1454 /*
1455 * get exclusive lock to whole file.
1456 * lock gets removed when we close the file
1457 *
1458 * returns 0 on success
1459 */
1460 int
1461 LockRRD(FILE *rrdfile)
1462 {
1463 int rrd_fd; /* File descriptor for RRD */
1464 int rcstat;
1466 rrd_fd = fileno(rrdfile);
1468 {
1469 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1470 struct _stat st;
1472 if ( _fstat( rrd_fd, &st ) == 0 ) {
1473 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1474 } else {
1475 rcstat = -1;
1476 }
1477 #else
1478 struct flock lock;
1479 lock.l_type = F_WRLCK; /* exclusive write lock */
1480 lock.l_len = 0; /* whole file */
1481 lock.l_start = 0; /* start of file */
1482 lock.l_whence = SEEK_SET; /* end of file */
1484 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1485 #endif
1486 }
1488 return(rcstat);
1489 }
1492 #ifdef HAVE_MMAP
1493 info_t
1494 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1495 unsigned short CDP_scratch_idx,
1496 #ifndef DEBUG
1497 FILE UNUSED(*rrd_file),
1498 #else
1499 FILE *rrd_file,
1500 #endif
1501 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1502 #else
1503 info_t
1504 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1505 unsigned short CDP_scratch_idx, FILE *rrd_file,
1506 info_t *pcdp_summary, time_t *rra_time)
1507 #endif
1508 {
1509 unsigned long ds_idx, cdp_idx;
1510 infoval iv;
1512 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1513 {
1514 /* compute the cdp index */
1515 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1516 #ifdef DEBUG
1517 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1518 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1519 rrd -> rra_def[rra_idx].cf_nam);
1520 #endif
1521 if (pcdp_summary != NULL)
1522 {
1523 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1524 /* append info to the return hash */
1525 pcdp_summary = info_push(pcdp_summary,
1526 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1527 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1528 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1529 RD_I_VAL, iv);
1530 }
1531 #ifdef HAVE_MMAP
1532 memcpy((char *)rrd_mmaped_file + *rra_current,
1533 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1534 sizeof(rrd_value_t));
1535 #else
1536 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1537 sizeof(rrd_value_t),1,rrd_file) != 1)
1538 {
1539 rrd_set_error("writing rrd");
1540 return 0;
1541 }
1542 #endif
1543 *rra_current += sizeof(rrd_value_t);
1544 }
1545 return (pcdp_summary);
1546 }