1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
91 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99 char *tmplt = NULL;
100 info_t *result = NULL;
101 infoval rc;
102 rc.u_int = -1;
103 optind = 0; opterr = 0; /* initialize getopt */
105 while (1) {
106 static struct option long_options[] =
107 {
108 {"template", required_argument, 0, 't'},
109 {0,0,0,0}
110 };
111 int option_index = 0;
112 int opt;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
116 if (opt == EOF)
117 break;
119 switch(opt) {
120 case 't':
121 tmplt = optarg;
122 break;
124 case '?':
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
126 goto end_tag;
127 }
128 }
130 /* need at least 2 arguments: filename, data. */
131 if (argc-optind < 2) {
132 rrd_set_error("Not enough arguments");
133 goto end_tag;
134 }
135 rc.u_int = 0;
136 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137 rc.u_int = _rrd_update(argv[optind], tmplt,
138 argc - optind - 1, (const char **)(argv + optind + 1), result);
139 result->value.u_int = rc.u_int;
140 end_tag:
141 return result;
142 }
144 int
145 rrd_update(int argc, char **argv)
146 {
147 char *tmplt = NULL;
148 int rc;
149 optind = 0; opterr = 0; /* initialize getopt */
151 while (1) {
152 static struct option long_options[] =
153 {
154 {"template", required_argument, 0, 't'},
155 {0,0,0,0}
156 };
157 int option_index = 0;
158 int opt;
159 opt = getopt_long(argc, argv, "t:",
160 long_options, &option_index);
162 if (opt == EOF)
163 break;
165 switch(opt) {
166 case 't':
167 tmplt = optarg;
168 break;
170 case '?':
171 rrd_set_error("unknown option '%s'",argv[optind-1]);
172 return(-1);
173 }
174 }
176 /* need at least 2 arguments: filename, data. */
177 if (argc-optind < 2) {
178 rrd_set_error("Not enough arguments");
180 return -1;
181 }
183 rc = rrd_update_r(argv[optind], tmplt,
184 argc - optind - 1, (const char **)(argv + optind + 1));
185 return rc;
186 }
188 int
189 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
190 {
191 return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
194 int
195 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
196 info_t *pcdp_summary)
197 {
199 int arg_i = 2;
200 short j;
201 unsigned long i,ii,iii=1;
203 unsigned long rra_begin; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer never changes value */
206 unsigned long rra_start; /* byte pointer to the rra
207 * area in the rrd file. this
208 * pointer changes as each rrd is
209 * processed. */
210 unsigned long rra_current; /* byte pointer to the current write
211 * spot in the rrd file. */
212 unsigned long rra_pos_tmp; /* temporary byte pointer. */
213 double interval,
214 pre_int,post_int; /* interval between this and
215 * the last run */
216 unsigned long proc_pdp_st; /* which pdp_st was the last
217 * to be processed */
218 unsigned long occu_pdp_st; /* when was the pdp_st
219 * before the last update
220 * time */
221 unsigned long proc_pdp_age; /* how old was the data in
222 * the pdp prep area when it
223 * was last updated */
224 unsigned long occu_pdp_age; /* how long ago was the last
225 * pdp_step time */
226 rrd_value_t *pdp_new; /* prepare the incoming data
227 * to be added the the
228 * existing entry */
229 rrd_value_t *pdp_temp; /* prepare the pdp values
230 * to be added the the
231 * cdp values */
233 long *tmpl_idx; /* index representing the settings
234 transported by the tmplt index */
235 unsigned long tmpl_cnt = 2; /* time and data */
237 FILE *rrd_file;
238 rrd_t rrd;
239 time_t current_time = 0;
240 time_t rra_time = 0; /* time of update for a RRA */
241 unsigned long current_time_usec=0;/* microseconds part of current time */
242 struct timeval tmp_time; /* used for time conversion */
244 char **updvals;
245 int schedule_smooth = 0;
246 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247 /* a vector of future Holt-Winters seasonal coefs */
248 unsigned long elapsed_pdp_st;
249 /* number of elapsed PDP steps since last update */
250 unsigned long *rra_step_cnt = NULL;
251 /* number of rows to be updated in an RRA for a data
252 * value. */
253 unsigned long start_pdp_offset;
254 /* number of PDP steps since the last update that
255 * are assigned to the first CDP to be generated
256 * since the last update. */
257 unsigned short scratch_idx;
258 /* index into the CDP scratch array */
259 enum cf_en current_cf;
260 /* numeric id of the current consolidation function */
261 rpnstack_t rpnstack; /* used for COMPUTE DS */
262 int version; /* rrd version */
263 char *endptr; /* used in the conversion */
265 #ifdef HAVE_MMAP
266 void *rrd_mmaped_file;
267 unsigned long rrd_filesize;
268 #endif
271 rpnstack_init(&rpnstack);
273 /* need at least 1 arguments: data. */
274 if (argc < 1) {
275 rrd_set_error("Not enough arguments");
276 return -1;
277 }
281 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
282 return -1;
283 }
285 /* initialize time */
286 version = atoi(rrd.stat_head->version);
287 gettimeofday(&tmp_time, 0);
288 normalize_time(&tmp_time);
289 current_time = tmp_time.tv_sec;
290 if(version >= 3) {
291 current_time_usec = tmp_time.tv_usec;
292 }
293 else {
294 current_time_usec = 0;
295 }
297 rra_current = rra_start = rra_begin = ftell(rrd_file);
298 /* This is defined in the ANSI C standard, section 7.9.5.3:
300 When a file is opened with udpate mode ('+' as the second
301 or third character in the ... list of mode argument
302 variables), both input and ouptut may be performed on the
303 associated stream. However, ... input may not be directly
304 followed by output without an intervening call to a file
305 positioning function, unless the input oepration encounters
306 end-of-file. */
307 #ifdef HAVE_MMAP
308 fseek(rrd_file, 0, SEEK_END);
309 rrd_filesize = ftell(rrd_file);
310 fseek(rrd_file, rra_current, SEEK_SET);
311 #else
312 fseek(rrd_file, 0, SEEK_CUR);
313 #endif
316 /* get exclusive lock to whole file.
317 * lock gets removed when we close the file.
318 */
319 if (LockRRD(rrd_file) != 0) {
320 rrd_set_error("could not lock RRD");
321 rrd_free(&rrd);
322 fclose(rrd_file);
323 return(-1);
324 }
326 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
327 rrd_set_error("allocating updvals pointer array");
328 rrd_free(&rrd);
329 fclose(rrd_file);
330 return(-1);
331 }
333 if ((pdp_temp = malloc(sizeof(rrd_value_t)
334 *rrd.stat_head->ds_cnt))==NULL){
335 rrd_set_error("allocating pdp_temp ...");
336 free(updvals);
337 rrd_free(&rrd);
338 fclose(rrd_file);
339 return(-1);
340 }
342 if ((tmpl_idx = malloc(sizeof(unsigned long)
343 *(rrd.stat_head->ds_cnt+1)))==NULL){
344 rrd_set_error("allocating tmpl_idx ...");
345 free(pdp_temp);
346 free(updvals);
347 rrd_free(&rrd);
348 fclose(rrd_file);
349 return(-1);
350 }
351 /* initialize tmplt redirector */
352 /* default config example (assume DS 1 is a CDEF DS)
353 tmpl_idx[0] -> 0; (time)
354 tmpl_idx[1] -> 1; (DS 0)
355 tmpl_idx[2] -> 3; (DS 2)
356 tmpl_idx[3] -> 4; (DS 3) */
357 tmpl_idx[0] = 0; /* time */
358 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
359 {
360 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
361 tmpl_idx[ii++]=i;
362 }
363 tmpl_cnt= ii;
365 if (tmplt) {
366 /* we should work on a writeable copy here */
367 char *dsname;
368 unsigned int tmpl_len;
369 char *tmplt_copy = strdup(tmplt);
370 dsname = tmplt_copy;
371 tmpl_cnt = 1; /* the first entry is the time */
372 tmpl_len = strlen(tmplt_copy);
373 for(i=0;i<=tmpl_len ;i++) {
374 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
375 tmplt_copy[i] = '\0';
376 if (tmpl_cnt>rrd.stat_head->ds_cnt){
377 rrd_set_error("tmplt contains more DS definitions than RRD");
378 free(updvals); free(pdp_temp);
379 free(tmpl_idx); rrd_free(&rrd);
380 fclose(rrd_file); return(-1);
381 }
382 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
383 rrd_set_error("unknown DS name '%s'",dsname);
384 free(updvals); free(pdp_temp);
385 free(tmplt_copy);
386 free(tmpl_idx); rrd_free(&rrd);
387 fclose(rrd_file); return(-1);
388 } else {
389 /* the first element is always the time */
390 tmpl_idx[tmpl_cnt-1]++;
391 /* go to the next entry on the tmplt_copy */
392 dsname = &tmplt_copy[i+1];
393 /* fix the damage we did before */
394 if (i<tmpl_len) {
395 tmplt_copy[i]=':';
396 }
398 }
399 }
400 }
401 free(tmplt_copy);
402 }
403 if ((pdp_new = malloc(sizeof(rrd_value_t)
404 *rrd.stat_head->ds_cnt))==NULL){
405 rrd_set_error("allocating pdp_new ...");
406 free(updvals);
407 free(pdp_temp);
408 free(tmpl_idx);
409 rrd_free(&rrd);
410 fclose(rrd_file);
411 return(-1);
412 }
414 #ifdef HAVE_MMAP
415 rrd_mmaped_file = mmap(0,
416 rrd_filesize,
417 PROT_READ | PROT_WRITE,
418 MAP_SHARED,
419 fileno(rrd_file),
420 0);
421 if (rrd_mmaped_file == MAP_FAILED) {
422 rrd_set_error("error mmapping file %s", filename);
423 free(updvals);
424 free(pdp_temp);
425 free(tmpl_idx);
426 rrd_free(&rrd);
427 fclose(rrd_file);
428 return(-1);
429 }
430 #endif
431 /* loop through the arguments. */
432 for(arg_i=0; arg_i<argc;arg_i++) {
433 char *stepper = strdup(argv[arg_i]);
434 char *step_start = stepper;
435 char *p;
436 char *parsetime_error = NULL;
437 enum {atstyle, normal} timesyntax;
438 struct rrd_time_value ds_tv;
439 if (stepper == NULL){
440 rrd_set_error("failed duplication argv entry");
441 free(step_start);
442 free(updvals);
443 free(pdp_temp);
444 free(tmpl_idx);
445 rrd_free(&rrd);
446 #ifdef HAVE_MMAP
447 munmap(rrd_mmaped_file, rrd_filesize);
448 #endif
449 fclose(rrd_file);
450 return(-1);
451 }
452 /* initialize all ds input to unknown except the first one
453 which has always got to be set */
454 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
455 updvals[0]=stepper;
456 /* separate all ds elements; first must be examined separately
457 due to alternate time syntax */
458 if ((p=strchr(stepper,'@'))!=NULL) {
459 timesyntax = atstyle;
460 *p = '\0';
461 stepper = p+1;
462 } else if ((p=strchr(stepper,':'))!=NULL) {
463 timesyntax = normal;
464 *p = '\0';
465 stepper = p+1;
466 } else {
467 rrd_set_error("expected timestamp not found in data source from %s",
468 argv[arg_i]);
469 free(step_start);
470 break;
471 }
472 ii=1;
473 updvals[tmpl_idx[ii]] = stepper;
474 while (*stepper) {
475 if (*stepper == ':') {
476 *stepper = '\0';
477 ii++;
478 if (ii<tmpl_cnt){
479 updvals[tmpl_idx[ii]] = stepper+1;
480 }
481 }
482 stepper++;
483 }
485 if (ii != tmpl_cnt-1) {
486 rrd_set_error("expected %lu data source readings (got %lu) from %s",
487 tmpl_cnt-1, ii, argv[arg_i]);
488 free(step_start);
489 break;
490 }
492 /* get the time from the reading ... handle N */
493 if (timesyntax == atstyle) {
494 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
495 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
496 free(step_start);
497 break;
498 }
499 if (ds_tv.type == RELATIVE_TO_END_TIME ||
500 ds_tv.type == RELATIVE_TO_START_TIME) {
501 rrd_set_error("specifying time relative to the 'start' "
502 "or 'end' makes no sense here: %s",
503 updvals[0]);
504 free(step_start);
505 break;
506 }
508 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
509 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
511 } else if (strcmp(updvals[0],"N")==0){
512 gettimeofday(&tmp_time, 0);
513 normalize_time(&tmp_time);
514 current_time = tmp_time.tv_sec;
515 current_time_usec = tmp_time.tv_usec;
516 } else {
517 double tmp;
518 tmp = strtod(updvals[0], 0);
519 current_time = floor(tmp);
520 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
521 }
522 /* dont do any correction for old version RRDs */
523 if(version < 3)
524 current_time_usec = 0;
526 if(current_time < rrd.live_head->last_up ||
527 (current_time == rrd.live_head->last_up &&
528 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
529 rrd_set_error("illegal attempt to update using time %ld when "
530 "last update time is %ld (minimum one second step)",
531 current_time, rrd.live_head->last_up);
532 free(step_start);
533 break;
534 }
537 /* seek to the beginning of the rra's */
538 if (rra_current != rra_begin) {
539 #ifndef HAVE_MMAP
540 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
541 rrd_set_error("seek error in rrd");
542 free(step_start);
543 break;
544 }
545 #endif
546 rra_current = rra_begin;
547 }
548 rra_start = rra_begin;
550 /* when was the current pdp started */
551 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
552 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
554 /* when did the last pdp_st occur */
555 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
556 occu_pdp_st = current_time - occu_pdp_age;
558 /* interval = current_time - rrd.live_head->last_up; */
559 interval = (double)(current_time - rrd.live_head->last_up)
560 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
562 if (occu_pdp_st > proc_pdp_st){
563 /* OK we passed the pdp_st moment*/
564 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
565 * occurred before the latest
566 * pdp_st moment*/
567 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
568 post_int = occu_pdp_age; /* how much after it */
569 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
570 } else {
571 pre_int = interval;
572 post_int = 0;
573 }
575 #ifdef DEBUG
576 printf(
577 "proc_pdp_age %lu\t"
578 "proc_pdp_st %lu\t"
579 "occu_pfp_age %lu\t"
580 "occu_pdp_st %lu\t"
581 "int %lf\t"
582 "pre_int %lf\t"
583 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
584 occu_pdp_age, occu_pdp_st,
585 interval, pre_int, post_int);
586 #endif
588 /* process the data sources and update the pdp_prep
589 * area accordingly */
590 for(i=0;i<rrd.stat_head->ds_cnt;i++){
591 enum dst_en dst_idx;
592 dst_idx= dst_conv(rrd.ds_def[i].dst);
594 /* make sure we do not build diffs with old last_ds values */
595 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
596 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
597 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
598 }
600 /* NOTE: DST_CDEF should never enter this if block, because
601 * updvals[i+1][0] is initialized to 'U'; unless the caller
602 * accidently specified a value for the DST_CDEF. To handle
603 * this case, an extra check is required. */
605 if((updvals[i+1][0] != 'U') &&
606 (dst_idx != DST_CDEF) &&
607 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
608 double rate = DNAN;
609 /* the data source type defines how to process the data */
610 /* pdp_new contains rate * time ... eg the bytes
611 * transferred during the interval. Doing it this way saves
612 * a lot of math operations */
615 switch(dst_idx){
616 case DST_COUNTER:
617 case DST_DERIVE:
618 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
619 for(ii=0;updvals[i+1][ii] != '\0';ii++){
620 if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
621 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
622 break;
623 }
624 }
625 if (rrd_test_error()){
626 break;
627 }
628 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
629 if(dst_idx == DST_COUNTER) {
630 /* simple overflow catcher suggested by Andres Kroonmaa */
631 /* this will fail terribly for non 32 or 64 bit counters ... */
632 /* are there any others in SNMP land ? */
633 if (pdp_new[i] < (double)0.0 )
634 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
635 if (pdp_new[i] < (double)0.0 )
636 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
637 }
638 rate = pdp_new[i] / interval;
639 }
640 else {
641 pdp_new[i]= DNAN;
642 }
643 break;
644 case DST_ABSOLUTE:
645 errno = 0;
646 pdp_new[i] = strtod(updvals[i+1],&endptr);
647 if (errno > 0){
648 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
649 break;
650 };
651 if (endptr[0] != '\0'){
652 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
653 break;
654 }
655 rate = pdp_new[i] / interval;
656 break;
657 case DST_GAUGE:
658 errno = 0;
659 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
660 if (errno > 0){
661 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
662 break;
663 };
664 if (endptr[0] != '\0'){
665 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
666 break;
667 }
668 rate = pdp_new[i] / interval;
669 break;
670 default:
671 rrd_set_error("rrd contains unknown DS type : '%s'",
672 rrd.ds_def[i].dst);
673 break;
674 }
675 /* break out of this for loop if the error string is set */
676 if (rrd_test_error()){
677 break;
678 }
679 /* make sure pdp_temp is neither too large or too small
680 * if any of these occur it becomes unknown ...
681 * sorry folks ... */
682 if ( ! isnan(rate) &&
683 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
685 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
687 pdp_new[i] = DNAN;
688 }
689 } else {
690 /* no news is news all the same */
691 pdp_new[i] = DNAN;
692 }
695 /* make a copy of the command line argument for the next run */
696 #ifdef DEBUG
697 fprintf(stderr,
698 "prep ds[%lu]\t"
699 "last_arg '%s'\t"
700 "this_arg '%s'\t"
701 "pdp_new %10.2f\n",
702 i,
703 rrd.pdp_prep[i].last_ds,
704 updvals[i+1], pdp_new[i]);
705 #endif
706 strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
707 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
708 }
709 /* break out of the argument parsing loop if the error_string is set */
710 if (rrd_test_error()){
711 free(step_start);
712 break;
713 }
714 /* has a pdp_st moment occurred since the last run ? */
716 if (proc_pdp_st == occu_pdp_st){
717 /* no we have not passed a pdp_st moment. therefore update is simple */
719 for(i=0;i<rrd.stat_head->ds_cnt;i++){
720 if(isnan(pdp_new[i])) {
721 /* this is not realy accurate if we use subsecond data arival time
722 should have thought of it when going subsecond resolution ...
723 sorry next format change we will have it! */
724 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
725 } else {
726 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
727 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
728 } else {
729 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
730 }
731 }
732 #ifdef DEBUG
733 fprintf(stderr,
734 "NO PDP ds[%lu]\t"
735 "value %10.2f\t"
736 "unkn_sec %5lu\n",
737 i,
738 rrd.pdp_prep[i].scratch[PDP_val].u_val,
739 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
740 #endif
741 }
742 } else {
743 /* an pdp_st has occurred. */
745 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
746 * occurred up to the last run.
747 pdp_new[] contains rate*seconds from the latest run.
748 pdp_temp[] will contain the rate for cdp */
750 for(i=0;i<rrd.stat_head->ds_cnt;i++){
751 /* update pdp_prep to the current pdp_st. */
752 double pre_unknown = 0.0;
753 if(isnan(pdp_new[i]))
754 /* a final bit of unkonwn to be added bevore calculation
755 * we use a tempaorary variable for this so that we
756 * don't have to turn integer lines before using the value */
757 pre_unknown = pre_int;
758 else {
759 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
760 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
761 } else {
762 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
763 }
764 }
767 /* if too much of the pdp_prep is unknown we dump it */
768 if (
769 /* removed because this does not agree with the definition
770 a heart beat can be unknown */
771 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
772 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
773 /* if the interval is larger thatn mrhb we get NAN */
774 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
775 (occu_pdp_st-proc_pdp_st <=
776 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
777 pdp_temp[i] = DNAN;
778 } else {
779 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
780 / ((double)(occu_pdp_st - proc_pdp_st
781 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
782 -pre_unknown);
783 }
785 /* process CDEF data sources; remember each CDEF DS can
786 * only reference other DS with a lower index number */
787 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
788 rpnp_t *rpnp;
789 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
790 /* substitue data values for OP_VARIABLE nodes */
791 for (ii = 0; rpnp[ii].op != OP_END; ii++)
792 {
793 if (rpnp[ii].op == OP_VARIABLE) {
794 rpnp[ii].op = OP_NUMBER;
795 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
796 }
797 }
798 /* run the rpn calculator */
799 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
800 free(rpnp);
801 break; /* exits the data sources pdp_temp loop */
802 }
803 }
805 /* make pdp_prep ready for the next run */
806 if(isnan(pdp_new[i])){
807 /* this is not realy accurate if we use subsecond data arival time
808 should have thought of it when going subsecond resolution ...
809 sorry next format change we will have it! */
810 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
811 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
812 } else {
813 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
814 rrd.pdp_prep[i].scratch[PDP_val].u_val =
815 pdp_new[i]/interval*post_int;
816 }
818 #ifdef DEBUG
819 fprintf(stderr,
820 "PDP UPD ds[%lu]\t"
821 "pdp_temp %10.2f\t"
822 "new_prep %10.2f\t"
823 "new_unkn_sec %5lu\n",
824 i, pdp_temp[i],
825 rrd.pdp_prep[i].scratch[PDP_val].u_val,
826 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
827 #endif
828 }
830 /* if there were errors during the last loop, bail out here */
831 if (rrd_test_error()){
832 free(step_start);
833 break;
834 }
836 /* compute the number of elapsed pdp_st moments */
837 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
838 #ifdef DEBUG
839 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
840 #endif
841 if (rra_step_cnt == NULL)
842 {
843 rra_step_cnt = (unsigned long *)
844 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
845 }
847 for(i = 0, rra_start = rra_begin;
848 i < rrd.stat_head->rra_cnt;
849 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
850 i++)
851 {
852 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
853 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
854 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
855 if (start_pdp_offset <= elapsed_pdp_st) {
856 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
857 rrd.rra_def[i].pdp_cnt + 1;
858 } else {
859 rra_step_cnt[i] = 0;
860 }
862 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
863 {
864 /* If this is a bulk update, we need to skip ahead in the seasonal
865 * arrays so that they will be correct for the next observed value;
866 * note that for the bulk update itself, no update will occur to
867 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
868 * be set to DNAN. */
869 if (rra_step_cnt[i] > 2)
870 {
871 /* skip update by resetting rra_step_cnt[i],
872 * note that this is not data source specific; this is due
873 * to the bulk update, not a DNAN value for the specific data
874 * source. */
875 rra_step_cnt[i] = 0;
876 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
877 &last_seasonal_coef);
878 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
879 &seasonal_coef);
880 }
882 /* periodically run a smoother for seasonal effects */
883 /* Need to use first cdp parameter buffer to track
884 * burnin (burnin requires a specific smoothing schedule).
885 * The CDP_init_seasonal parameter is really an RRA level,
886 * not a data source within RRA level parameter, but the rra_def
887 * is read only for rrd_update (not flushed to disk). */
888 iii = i*(rrd.stat_head -> ds_cnt);
889 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
890 <= BURNIN_CYCLES)
891 {
892 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
893 > rrd.rra_def[i].row_cnt - 1) {
894 /* mark off one of the burnin cycles */
895 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
896 schedule_smooth = 1;
897 }
898 } else {
899 /* someone has no doubt invented a trick to deal with this
900 * wrap around, but at least this code is clear. */
901 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
902 rrd.rra_ptr[i].cur_row)
903 {
904 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
905 * mapping between PDP and CDP */
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
908 {
909 #ifdef DEBUG
910 fprintf(stderr,
911 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
912 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
913 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
914 #endif
915 schedule_smooth = 1;
916 }
917 } else {
918 /* can't rely on negative numbers because we are working with
919 * unsigned values */
920 /* Don't need modulus here. If we've wrapped more than once, only
921 * one smooth is executed at the end. */
922 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
923 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
924 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
925 {
926 #ifdef DEBUG
927 fprintf(stderr,
928 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
929 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
930 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932 schedule_smooth = 1;
933 }
934 }
935 }
937 rra_current = ftell(rrd_file);
938 } /* if cf is DEVSEASONAL or SEASONAL */
940 if (rrd_test_error()) break;
942 /* update CDP_PREP areas */
943 /* loop over data soures within each RRA */
944 for(ii = 0;
945 ii < rrd.stat_head->ds_cnt;
946 ii++)
947 {
949 /* iii indexes the CDP prep area for this data source within the RRA */
950 iii=i*rrd.stat_head->ds_cnt+ii;
952 if (rrd.rra_def[i].pdp_cnt > 1) {
954 if (rra_step_cnt[i] > 0) {
955 /* If we are in this block, as least 1 CDP value will be written to
956 * disk, this is the CDP_primary_val entry. If more than 1 value needs
957 * to be written, then the "fill in" value is the CDP_secondary_val
958 * entry. */
959 if (isnan(pdp_temp[ii]))
960 {
961 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
962 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
963 } else {
964 /* CDP_secondary value is the RRA "fill in" value for intermediary
965 * CDP data entries. No matter the CF, the value is the same because
966 * the average, max, min, and last of a list of identical values is
967 * the same, namely, the value itself. */
968 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
969 }
971 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
972 > rrd.rra_def[i].pdp_cnt*
973 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
974 {
975 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
976 /* initialize carry over */
977 if (current_cf == CF_AVERAGE) {
978 if (isnan(pdp_temp[ii])) {
979 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
980 } else {
981 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
982 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
983 }
984 } else {
985 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
986 }
987 } else {
988 rrd_value_t cum_val, cur_val;
989 switch (current_cf) {
990 case CF_AVERAGE:
991 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
992 cur_val = IFDNAN(pdp_temp[ii],0.0);
993 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
994 (cum_val + cur_val * start_pdp_offset) /
995 (rrd.rra_def[i].pdp_cnt
996 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
997 /* initialize carry over value */
998 if (isnan(pdp_temp[ii])) {
999 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1000 } else {
1001 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1002 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1003 }
1004 break;
1005 case CF_MAXIMUM:
1006 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1007 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1008 #ifdef DEBUG
1009 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1010 isnan(pdp_temp[ii])) {
1011 fprintf(stderr,
1012 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1013 i,ii);
1014 exit(-1);
1015 }
1016 #endif
1017 if (cur_val > cum_val)
1018 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1019 else
1020 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1021 /* initialize carry over value */
1022 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1023 break;
1024 case CF_MINIMUM:
1025 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1026 cur_val = IFDNAN(pdp_temp[ii],DINF);
1027 #ifdef DEBUG
1028 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1029 isnan(pdp_temp[ii])) {
1030 fprintf(stderr,
1031 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1032 i,ii);
1033 exit(-1);
1034 }
1035 #endif
1036 if (cur_val < cum_val)
1037 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1038 else
1039 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1040 /* initialize carry over value */
1041 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1042 break;
1043 case CF_LAST:
1044 default:
1045 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1046 /* initialize carry over value */
1047 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1048 break;
1049 }
1050 } /* endif meets xff value requirement for a valid value */
1051 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1052 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1053 if (isnan(pdp_temp[ii]))
1054 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1055 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1056 else
1057 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1058 } else /* rra_step_cnt[i] == 0 */
1059 {
1060 #ifdef DEBUG
1061 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1062 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1063 i,ii);
1064 } else {
1065 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1066 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1067 }
1068 #endif
1069 if (isnan(pdp_temp[ii])) {
1070 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1071 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1072 {
1073 if (current_cf == CF_AVERAGE) {
1074 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1075 elapsed_pdp_st;
1076 } else {
1077 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1078 }
1079 #ifdef DEBUG
1080 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1081 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1082 #endif
1083 } else {
1084 switch (current_cf) {
1085 case CF_AVERAGE:
1086 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1087 elapsed_pdp_st;
1088 break;
1089 case CF_MINIMUM:
1090 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1091 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1092 break;
1093 case CF_MAXIMUM:
1094 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1096 break;
1097 case CF_LAST:
1098 default:
1099 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1100 break;
1101 }
1102 }
1103 }
1104 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1105 if (elapsed_pdp_st > 2)
1106 {
1107 switch (current_cf) {
1108 case CF_AVERAGE:
1109 default:
1110 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1111 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1112 break;
1113 case CF_SEASONAL:
1114 case CF_DEVSEASONAL:
1115 /* need to update cached seasonal values, so they are consistent
1116 * with the bulk update */
1117 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1118 * CDP_last_deviation are the same. */
1119 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1120 last_seasonal_coef[ii];
1121 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1122 seasonal_coef[ii];
1123 break;
1124 case CF_HWPREDICT:
1125 /* need to update the null_count and last_null_count.
1126 * even do this for non-DNAN pdp_temp because the
1127 * algorithm is not learning from batch updates. */
1128 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1129 elapsed_pdp_st;
1130 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1131 elapsed_pdp_st - 1;
1132 /* fall through */
1133 case CF_DEVPREDICT:
1134 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1135 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1136 break;
1137 case CF_FAILURES:
1138 /* do not count missed bulk values as failures */
1139 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1140 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1141 /* need to reset violations buffer.
1142 * could do this more carefully, but for now, just
1143 * assume a bulk update wipes away all violations. */
1144 erase_violations(&rrd, iii, i);
1145 break;
1146 }
1147 }
1148 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1150 if (rrd_test_error()) break;
1152 } /* endif data sources loop */
1153 } /* end RRA Loop */
1155 /* this loop is only entered if elapsed_pdp_st < 3 */
1156 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1157 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1158 {
1159 for(i = 0, rra_start = rra_begin;
1160 i < rrd.stat_head->rra_cnt;
1161 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1162 i++)
1163 {
1164 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1166 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1167 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1168 {
1169 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1170 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1171 &seasonal_coef);
1172 rra_current = ftell(rrd_file);
1173 }
1174 if (rrd_test_error()) break;
1175 /* loop over data soures within each RRA */
1176 for(ii = 0;
1177 ii < rrd.stat_head->ds_cnt;
1178 ii++)
1179 {
1180 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1181 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1182 scratch_idx, seasonal_coef);
1183 }
1184 } /* end RRA Loop */
1185 if (rrd_test_error()) break;
1186 } /* end elapsed_pdp_st loop */
1188 if (rrd_test_error()) break;
1190 /* Ready to write to disk */
1191 /* Move sequentially through the file, writing one RRA at a time.
1192 * Note this architecture divorces the computation of CDP with
1193 * flushing updated RRA entries to disk. */
1194 for(i = 0, rra_start = rra_begin;
1195 i < rrd.stat_head->rra_cnt;
1196 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1197 i++) {
1198 /* is there anything to write for this RRA? If not, continue. */
1199 if (rra_step_cnt[i] == 0) continue;
1201 /* write the first row */
1202 #ifdef DEBUG
1203 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1204 #endif
1205 rrd.rra_ptr[i].cur_row++;
1206 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1207 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1208 /* positition on the first row */
1209 rra_pos_tmp = rra_start +
1210 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1211 if(rra_pos_tmp != rra_current) {
1212 #ifndef HAVE_MMAP
1213 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1214 rrd_set_error("seek error in rrd");
1215 break;
1216 }
1217 #endif
1218 rra_current = rra_pos_tmp;
1219 }
1221 #ifdef DEBUG
1222 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1223 #endif
1224 scratch_idx = CDP_primary_val;
1225 if (pcdp_summary != NULL)
1226 {
1227 rra_time = (current_time - current_time
1228 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1229 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1230 }
1231 #ifdef HAVE_MMAP
1232 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1233 pcdp_summary, &rra_time, rrd_mmaped_file);
1234 #else
1235 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1236 pcdp_summary, &rra_time);
1237 #endif
1238 if (rrd_test_error()) break;
1240 /* write other rows of the bulk update, if any */
1241 scratch_idx = CDP_secondary_val;
1242 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1243 {
1244 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1245 {
1246 #ifdef DEBUG
1247 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1248 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1249 #endif
1250 /* wrap */
1251 rrd.rra_ptr[i].cur_row = 0;
1252 /* seek back to beginning of current rra */
1253 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1254 {
1255 rrd_set_error("seek error in rrd");
1256 break;
1257 }
1258 #ifdef DEBUG
1259 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1260 #endif
1261 rra_current = rra_start;
1262 }
1263 if (pcdp_summary != NULL)
1264 {
1265 rra_time = (current_time - current_time
1266 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1267 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1268 }
1269 #ifdef HAVE_MMAP
1270 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1271 pcdp_summary, &rra_time, rrd_mmaped_file);
1272 #else
1273 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1274 pcdp_summary, &rra_time);
1275 #endif
1276 }
1278 if (rrd_test_error())
1279 break;
1280 } /* RRA LOOP */
1282 /* break out of the argument parsing loop if error_string is set */
1283 if (rrd_test_error()){
1284 free(step_start);
1285 break;
1286 }
1288 } /* endif a pdp_st has occurred */
1289 rrd.live_head->last_up = current_time;
1290 rrd.live_head->last_up_usec = current_time_usec;
1291 free(step_start);
1292 } /* function argument loop */
1294 if (seasonal_coef != NULL) free(seasonal_coef);
1295 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1296 if (rra_step_cnt != NULL) free(rra_step_cnt);
1297 rpnstack_free(&rpnstack);
1299 #ifdef HAVE_MMAP
1300 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1301 rrd_set_error("error writing(unmapping) file: %s", filename);
1302 }
1303 #endif
1304 /* if we got here and if there is an error and if the file has not been
1305 * written to, then close things up and return. */
1306 if (rrd_test_error()) {
1307 free(updvals);
1308 free(tmpl_idx);
1309 rrd_free(&rrd);
1310 free(pdp_temp);
1311 free(pdp_new);
1312 fclose(rrd_file);
1313 return(-1);
1314 }
1316 /* aargh ... that was tough ... so many loops ... anyway, its done.
1317 * we just need to write back the live header portion now*/
1319 if (fseek(rrd_file, (sizeof(stat_head_t)
1320 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1321 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1322 SEEK_SET) != 0) {
1323 rrd_set_error("seek rrd for live header writeback");
1324 free(updvals);
1325 free(tmpl_idx);
1326 rrd_free(&rrd);
1327 free(pdp_temp);
1328 free(pdp_new);
1329 fclose(rrd_file);
1330 return(-1);
1331 }
1333 if(version >= 3) {
1334 if(fwrite( rrd.live_head,
1335 sizeof(live_head_t), 1, rrd_file) != 1){
1336 rrd_set_error("fwrite live_head to rrd");
1337 free(updvals);
1338 rrd_free(&rrd);
1339 free(tmpl_idx);
1340 free(pdp_temp);
1341 free(pdp_new);
1342 fclose(rrd_file);
1343 return(-1);
1344 }
1345 }
1346 else {
1347 if(fwrite( &rrd.live_head->last_up,
1348 sizeof(time_t), 1, rrd_file) != 1){
1349 rrd_set_error("fwrite live_head to rrd");
1350 free(updvals);
1351 rrd_free(&rrd);
1352 free(tmpl_idx);
1353 free(pdp_temp);
1354 free(pdp_new);
1355 fclose(rrd_file);
1356 return(-1);
1357 }
1358 }
1361 if(fwrite( rrd.pdp_prep,
1362 sizeof(pdp_prep_t),
1363 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1364 rrd_set_error("ftwrite pdp_prep to rrd");
1365 free(updvals);
1366 rrd_free(&rrd);
1367 free(tmpl_idx);
1368 free(pdp_temp);
1369 free(pdp_new);
1370 fclose(rrd_file);
1371 return(-1);
1372 }
1374 if(fwrite( rrd.cdp_prep,
1375 sizeof(cdp_prep_t),
1376 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1377 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1379 rrd_set_error("ftwrite cdp_prep to rrd");
1380 free(updvals);
1381 free(tmpl_idx);
1382 rrd_free(&rrd);
1383 free(pdp_temp);
1384 free(pdp_new);
1385 fclose(rrd_file);
1386 return(-1);
1387 }
1389 if(fwrite( rrd.rra_ptr,
1390 sizeof(rra_ptr_t),
1391 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1392 rrd_set_error("fwrite rra_ptr to rrd");
1393 free(updvals);
1394 free(tmpl_idx);
1395 rrd_free(&rrd);
1396 free(pdp_temp);
1397 free(pdp_new);
1398 fclose(rrd_file);
1399 return(-1);
1400 }
1401 #ifdef HAVE_POSIX_FADVISE
1403 /* with update we have write ops, so they will probably not be done by now, this means
1404 the buffers will not get freed. But calling this for the whole file - header
1405 will let the data off the hook as soon as it is written when if it is from a previous
1406 update cycle. Calling fdsync to force things is much too hard here. */
1408 if (0 != posix_fadvise(fileno(rrd_file), rra_begin, 0, POSIX_FADV_DONTNEED)) {
1409 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1410 fclose(rrd_file);
1411 return(-1);
1412 }
1413 #endif
1415 /* OK now close the files and free the memory */
1416 if(fclose(rrd_file) != 0){
1417 rrd_set_error("closing rrd");
1418 free(updvals);
1419 free(tmpl_idx);
1420 rrd_free(&rrd);
1421 free(pdp_temp);
1422 free(pdp_new);
1423 return(-1);
1424 }
1426 /* calling the smoothing code here guarantees at most
1427 * one smoothing operation per rrd_update call. Unfortunately,
1428 * it is possible with bulk updates, or a long-delayed update
1429 * for smoothing to occur off-schedule. This really isn't
1430 * critical except during the burning cycles. */
1431 if (schedule_smooth)
1432 {
1433 rrd_file = fopen(filename,"rb+");
1436 rra_start = rra_begin;
1437 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1438 {
1439 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1440 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1441 {
1442 #ifdef DEBUG
1443 fprintf(stderr,"Running smoother for rra %ld\n",i);
1444 #endif
1445 apply_smoother(&rrd,i,rra_start,rrd_file);
1446 if (rrd_test_error())
1447 break;
1448 }
1449 rra_start += rrd.rra_def[i].row_cnt
1450 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1451 }
1452 #ifdef HAVE_POSIX_FADVISE
1453 /* same procedure as above ... */
1454 if (0 != posix_fadvise(fileno(rrd_file), rra_begin, 0, POSIX_FADV_DONTNEED)) {
1455 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s",filename, rrd_strerror(errno));
1456 fclose(rrd_file);
1457 return(-1);
1458 }
1459 #endif
1460 fclose(rrd_file);
1461 }
1462 rrd_free(&rrd);
1463 free(updvals);
1464 free(tmpl_idx);
1465 free(pdp_new);
1466 free(pdp_temp);
1467 return(0);
1468 }
1470 /*
1471 * get exclusive lock to whole file.
1472 * lock gets removed when we close the file
1473 *
1474 * returns 0 on success
1475 */
1476 int
1477 LockRRD(FILE *rrdfile)
1478 {
1479 int rrd_fd; /* File descriptor for RRD */
1480 int rcstat;
1482 rrd_fd = fileno(rrdfile);
1484 {
1485 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1486 struct _stat st;
1488 if ( _fstat( rrd_fd, &st ) == 0 ) {
1489 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1490 } else {
1491 rcstat = -1;
1492 }
1493 #else
1494 struct flock lock;
1495 lock.l_type = F_WRLCK; /* exclusive write lock */
1496 lock.l_len = 0; /* whole file */
1497 lock.l_start = 0; /* start of file */
1498 lock.l_whence = SEEK_SET; /* end of file */
1500 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1501 #endif
1502 }
1504 return(rcstat);
1505 }
1508 #ifdef HAVE_MMAP
1509 info_t
1510 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1511 unsigned short CDP_scratch_idx,
1512 #ifndef DEBUG
1513 FILE UNUSED(*rrd_file),
1514 #else
1515 FILE *rrd_file,
1516 #endif
1517 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1518 #else
1519 info_t
1520 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1521 unsigned short CDP_scratch_idx, FILE *rrd_file,
1522 info_t *pcdp_summary, time_t *rra_time)
1523 #endif
1524 {
1525 unsigned long ds_idx, cdp_idx;
1526 infoval iv;
1528 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1529 {
1530 /* compute the cdp index */
1531 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1532 #ifdef DEBUG
1533 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1534 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1535 rrd -> rra_def[rra_idx].cf_nam);
1536 #endif
1537 if (pcdp_summary != NULL)
1538 {
1539 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1540 /* append info to the return hash */
1541 pcdp_summary = info_push(pcdp_summary,
1542 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1543 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1544 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1545 RD_I_VAL, iv);
1546 }
1547 #ifdef HAVE_MMAP
1548 memcpy((char *)rrd_mmaped_file + *rra_current,
1549 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1550 sizeof(rrd_value_t));
1551 #else
1552 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1553 sizeof(rrd_value_t),1,rrd_file) != 1)
1554 {
1555 rrd_set_error("writing rrd");
1556 return 0;
1557 }
1558 #endif
1559 *rra_current += sizeof(rrd_value_t);
1560 }
1561 return (pcdp_summary);
1562 }