1 /*****************************************************************************
2 * RRDtool 1.2.16 Copyright by Tobi Oetiker, 1997-2006
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 #include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normilize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static void normalize_time(struct timeval *t)
65 {
66 if(t->tv_usec < 0) {
67 t->tv_sec--;
68 t->tv_usec += 1000000L;
69 }
70 }
72 /* Local prototypes */
73 int LockRRD(FILE *rrd_file);
74 #ifdef HAVE_MMAP
75 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
76 unsigned long *rra_current,
77 unsigned short CDP_scratch_idx,
78 #ifndef DEBUG
79 FILE UNUSED(*rrd_file),
80 #else
81 FILE *rrd_file,
82 #endif
83 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
84 #else
85 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
86 unsigned long *rra_current,
87 unsigned short CDP_scratch_idx, FILE *rrd_file,
88 info_t *pcdp_summary, time_t *rra_time);
89 #endif
90 int rrd_update_r(char *filename, char *tmplt, int argc, char **argv);
91 int _rrd_update(char *filename, char *tmplt, int argc, char **argv,
92 info_t*);
94 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 info_t *rrd_update_v(int argc, char **argv)
98 {
99 char *tmplt = NULL;
100 info_t *result = NULL;
101 infoval rc;
102 rc.u_int = -1;
103 optind = 0; opterr = 0; /* initialize getopt */
105 while (1) {
106 static struct option long_options[] =
107 {
108 {"template", required_argument, 0, 't'},
109 {0,0,0,0}
110 };
111 int option_index = 0;
112 int opt;
113 opt = getopt_long(argc, argv, "t:",
114 long_options, &option_index);
116 if (opt == EOF)
117 break;
119 switch(opt) {
120 case 't':
121 tmplt = optarg;
122 break;
124 case '?':
125 rrd_set_error("unknown option '%s'",argv[optind-1]);
126 goto end_tag;
127 }
128 }
130 /* need at least 2 arguments: filename, data. */
131 if (argc-optind < 2) {
132 rrd_set_error("Not enough arguments");
133 goto end_tag;
134 }
135 rc.u_int = 0;
136 result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
137 rc.u_int = _rrd_update(argv[optind], tmplt,
138 argc - optind - 1, argv + optind + 1, result);
139 result->value.u_int = rc.u_int;
140 end_tag:
141 return result;
142 }
144 int
145 rrd_update(int argc, char **argv)
146 {
147 char *tmplt = NULL;
148 int rc;
149 optind = 0; opterr = 0; /* initialize getopt */
151 while (1) {
152 static struct option long_options[] =
153 {
154 {"template", required_argument, 0, 't'},
155 {0,0,0,0}
156 };
157 int option_index = 0;
158 int opt;
159 opt = getopt_long(argc, argv, "t:",
160 long_options, &option_index);
162 if (opt == EOF)
163 break;
165 switch(opt) {
166 case 't':
167 tmplt = optarg;
168 break;
170 case '?':
171 rrd_set_error("unknown option '%s'",argv[optind-1]);
172 return(-1);
173 }
174 }
176 /* need at least 2 arguments: filename, data. */
177 if (argc-optind < 2) {
178 rrd_set_error("Not enough arguments");
180 return -1;
181 }
183 rc = rrd_update_r(argv[optind], tmplt,
184 argc - optind - 1, argv + optind + 1);
185 return rc;
186 }
188 int
189 rrd_update_r(char *filename, char *tmplt, int argc, char **argv)
190 {
191 return _rrd_update(filename, tmplt, argc, argv, NULL);
192 }
194 int
195 _rrd_update(char *filename, char *tmplt, int argc, char **argv,
196 info_t *pcdp_summary)
197 {
199 int arg_i = 2;
200 short j;
201 unsigned long i,ii,iii=1;
203 unsigned long rra_begin; /* byte pointer to the rra
204 * area in the rrd file. this
205 * pointer never changes value */
206 unsigned long rra_start; /* byte pointer to the rra
207 * area in the rrd file. this
208 * pointer changes as each rrd is
209 * processed. */
210 unsigned long rra_current; /* byte pointer to the current write
211 * spot in the rrd file. */
212 unsigned long rra_pos_tmp; /* temporary byte pointer. */
213 double interval,
214 pre_int,post_int; /* interval between this and
215 * the last run */
216 unsigned long proc_pdp_st; /* which pdp_st was the last
217 * to be processed */
218 unsigned long occu_pdp_st; /* when was the pdp_st
219 * before the last update
220 * time */
221 unsigned long proc_pdp_age; /* how old was the data in
222 * the pdp prep area when it
223 * was last updated */
224 unsigned long occu_pdp_age; /* how long ago was the last
225 * pdp_step time */
226 rrd_value_t *pdp_new; /* prepare the incoming data
227 * to be added the the
228 * existing entry */
229 rrd_value_t *pdp_temp; /* prepare the pdp values
230 * to be added the the
231 * cdp values */
233 long *tmpl_idx; /* index representing the settings
234 transported by the tmplt index */
235 unsigned long tmpl_cnt = 2; /* time and data */
237 FILE *rrd_file;
238 rrd_t rrd;
239 time_t current_time = 0;
240 time_t rra_time = 0; /* time of update for a RRA */
241 unsigned long current_time_usec=0;/* microseconds part of current time */
242 struct timeval tmp_time; /* used for time conversion */
244 char **updvals;
245 int schedule_smooth = 0;
246 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
247 /* a vector of future Holt-Winters seasonal coefs */
248 unsigned long elapsed_pdp_st;
249 /* number of elapsed PDP steps since last update */
250 unsigned long *rra_step_cnt = NULL;
251 /* number of rows to be updated in an RRA for a data
252 * value. */
253 unsigned long start_pdp_offset;
254 /* number of PDP steps since the last update that
255 * are assigned to the first CDP to be generated
256 * since the last update. */
257 unsigned short scratch_idx;
258 /* index into the CDP scratch array */
259 enum cf_en current_cf;
260 /* numeric id of the current consolidation function */
261 rpnstack_t rpnstack; /* used for COMPUTE DS */
262 int version; /* rrd version */
263 char *endptr; /* used in the conversion */
264 #ifdef HAVE_MMAP
265 void *rrd_mmaped_file;
266 unsigned long rrd_filesize;
267 #endif
270 rpnstack_init(&rpnstack);
272 /* need at least 1 arguments: data. */
273 if (argc < 1) {
274 rrd_set_error("Not enough arguments");
275 return -1;
276 }
280 if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
281 return -1;
282 }
283 /* initialize time */
284 version = atoi(rrd.stat_head->version);
285 gettimeofday(&tmp_time, 0);
286 normalize_time(&tmp_time);
287 current_time = tmp_time.tv_sec;
288 if(version >= 3) {
289 current_time_usec = tmp_time.tv_usec;
290 }
291 else {
292 current_time_usec = 0;
293 }
295 rra_current = rra_start = rra_begin = ftell(rrd_file);
296 /* This is defined in the ANSI C standard, section 7.9.5.3:
298 When a file is opened with udpate mode ('+' as the second
299 or third character in the ... list of mode argument
300 variables), both input and ouptut may be performed on the
301 associated stream. However, ... input may not be directly
302 followed by output without an intervening call to a file
303 positioning function, unless the input oepration encounters
304 end-of-file. */
305 #ifdef HAVE_MMAP
306 fseek(rrd_file, 0, SEEK_END);
307 rrd_filesize = ftell(rrd_file);
308 fseek(rrd_file, rra_current, SEEK_SET);
309 #else
310 fseek(rrd_file, 0, SEEK_CUR);
311 #endif
314 /* get exclusive lock to whole file.
315 * lock gets removed when we close the file.
316 */
317 if (LockRRD(rrd_file) != 0) {
318 rrd_set_error("could not lock RRD");
319 rrd_free(&rrd);
320 fclose(rrd_file);
321 return(-1);
322 }
324 if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
325 rrd_set_error("allocating updvals pointer array");
326 rrd_free(&rrd);
327 fclose(rrd_file);
328 return(-1);
329 }
331 if ((pdp_temp = malloc(sizeof(rrd_value_t)
332 *rrd.stat_head->ds_cnt))==NULL){
333 rrd_set_error("allocating pdp_temp ...");
334 free(updvals);
335 rrd_free(&rrd);
336 fclose(rrd_file);
337 return(-1);
338 }
340 if ((tmpl_idx = malloc(sizeof(unsigned long)
341 *(rrd.stat_head->ds_cnt+1)))==NULL){
342 rrd_set_error("allocating tmpl_idx ...");
343 free(pdp_temp);
344 free(updvals);
345 rrd_free(&rrd);
346 fclose(rrd_file);
347 return(-1);
348 }
349 /* initialize tmplt redirector */
350 /* default config example (assume DS 1 is a CDEF DS)
351 tmpl_idx[0] -> 0; (time)
352 tmpl_idx[1] -> 1; (DS 0)
353 tmpl_idx[2] -> 3; (DS 2)
354 tmpl_idx[3] -> 4; (DS 3) */
355 tmpl_idx[0] = 0; /* time */
356 for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
357 {
358 if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
359 tmpl_idx[ii++]=i;
360 }
361 tmpl_cnt= ii;
363 if (tmplt) {
364 /* we should work on a writeable copy here */
365 char *dsname;
366 unsigned int tmpl_len;
367 tmplt = strdup(tmplt);
368 dsname = tmplt;
369 tmpl_cnt = 1; /* the first entry is the time */
370 tmpl_len = strlen(tmplt);
371 for(i=0;i<=tmpl_len ;i++) {
372 if (tmplt[i] == ':' || tmplt[i] == '\0') {
373 tmplt[i] = '\0';
374 if (tmpl_cnt>rrd.stat_head->ds_cnt){
375 rrd_set_error("tmplt contains more DS definitions than RRD");
376 free(updvals); free(pdp_temp);
377 free(tmpl_idx); rrd_free(&rrd);
378 fclose(rrd_file); return(-1);
379 }
380 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
381 rrd_set_error("unknown DS name '%s'",dsname);
382 free(updvals); free(pdp_temp);
383 free(tmplt);
384 free(tmpl_idx); rrd_free(&rrd);
385 fclose(rrd_file); return(-1);
386 } else {
387 /* the first element is always the time */
388 tmpl_idx[tmpl_cnt-1]++;
389 /* go to the next entry on the tmplt */
390 dsname = &tmplt[i+1];
391 /* fix the damage we did before */
392 if (i<tmpl_len) {
393 tmplt[i]=':';
394 }
396 }
397 }
398 }
399 free(tmplt);
400 }
401 if ((pdp_new = malloc(sizeof(rrd_value_t)
402 *rrd.stat_head->ds_cnt))==NULL){
403 rrd_set_error("allocating pdp_new ...");
404 free(updvals);
405 free(pdp_temp);
406 free(tmpl_idx);
407 rrd_free(&rrd);
408 fclose(rrd_file);
409 return(-1);
410 }
412 #ifdef HAVE_MMAP
413 rrd_mmaped_file = mmap(0,
414 rrd_filesize,
415 PROT_READ | PROT_WRITE,
416 MAP_SHARED,
417 fileno(rrd_file),
418 0);
419 if (rrd_mmaped_file == MAP_FAILED) {
420 rrd_set_error("error mmapping file %s", filename);
421 free(updvals);
422 free(pdp_temp);
423 free(tmpl_idx);
424 rrd_free(&rrd);
425 fclose(rrd_file);
426 return(-1);
427 }
428 #endif
429 /* loop through the arguments. */
430 for(arg_i=0; arg_i<argc;arg_i++) {
431 char *stepper = strdup(argv[arg_i]);
432 char *step_start = stepper;
433 char *p;
434 char *parsetime_error = NULL;
435 enum {atstyle, normal} timesyntax;
436 struct rrd_time_value ds_tv;
437 if (stepper == NULL){
438 rrd_set_error("failed duplication argv entry");
439 free(step_start);
440 free(updvals);
441 free(pdp_temp);
442 free(tmpl_idx);
443 rrd_free(&rrd);
444 #ifdef HAVE_MMAP
445 munmap(rrd_mmaped_file, rrd_filesize);
446 #endif
447 fclose(rrd_file);
448 return(-1);
449 }
450 /* initialize all ds input to unknown except the first one
451 which has always got to be set */
452 for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
453 updvals[0]=stepper;
454 /* separate all ds elements; first must be examined separately
455 due to alternate time syntax */
456 if ((p=strchr(stepper,'@'))!=NULL) {
457 timesyntax = atstyle;
458 *p = '\0';
459 stepper = p+1;
460 } else if ((p=strchr(stepper,':'))!=NULL) {
461 timesyntax = normal;
462 *p = '\0';
463 stepper = p+1;
464 } else {
465 rrd_set_error("expected timestamp not found in data source from %s",
466 argv[arg_i]);
467 free(step_start);
468 break;
469 }
470 ii=1;
471 updvals[tmpl_idx[ii]] = stepper;
472 while (*stepper) {
473 if (*stepper == ':') {
474 *stepper = '\0';
475 ii++;
476 if (ii<tmpl_cnt){
477 updvals[tmpl_idx[ii]] = stepper+1;
478 }
479 }
480 stepper++;
481 }
483 if (ii != tmpl_cnt-1) {
484 rrd_set_error("expected %lu data source readings (got %lu) from %s",
485 tmpl_cnt-1, ii, argv[arg_i]);
486 free(step_start);
487 break;
488 }
490 /* get the time from the reading ... handle N */
491 if (timesyntax == atstyle) {
492 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
493 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
494 free(step_start);
495 break;
496 }
497 if (ds_tv.type == RELATIVE_TO_END_TIME ||
498 ds_tv.type == RELATIVE_TO_START_TIME) {
499 rrd_set_error("specifying time relative to the 'start' "
500 "or 'end' makes no sense here: %s",
501 updvals[0]);
502 free(step_start);
503 break;
504 }
506 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
507 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
509 } else if (strcmp(updvals[0],"N")==0){
510 gettimeofday(&tmp_time, 0);
511 normalize_time(&tmp_time);
512 current_time = tmp_time.tv_sec;
513 current_time_usec = tmp_time.tv_usec;
514 } else {
515 double tmp;
516 tmp = strtod(updvals[0], 0);
517 current_time = floor(tmp);
518 current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
519 }
520 /* dont do any correction for old version RRDs */
521 if(version < 3)
522 current_time_usec = 0;
524 if(current_time < rrd.live_head->last_up ||
525 (current_time == rrd.live_head->last_up &&
526 (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
527 rrd_set_error("illegal attempt to update using time %ld when "
528 "last update time is %ld (minimum one second step)",
529 current_time, rrd.live_head->last_up);
530 free(step_start);
531 break;
532 }
535 /* seek to the beginning of the rra's */
536 if (rra_current != rra_begin) {
537 #ifndef HAVE_MMAP
538 if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
539 rrd_set_error("seek error in rrd");
540 free(step_start);
541 break;
542 }
543 #endif
544 rra_current = rra_begin;
545 }
546 rra_start = rra_begin;
548 /* when was the current pdp started */
549 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
550 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
552 /* when did the last pdp_st occur */
553 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
554 occu_pdp_st = current_time - occu_pdp_age;
556 /* interval = current_time - rrd.live_head->last_up; */
557 interval = (double)(current_time - rrd.live_head->last_up)
558 + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
560 if (occu_pdp_st > proc_pdp_st){
561 /* OK we passed the pdp_st moment*/
562 pre_int = (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
563 * occurred before the latest
564 * pdp_st moment*/
565 pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
566 post_int = occu_pdp_age; /* how much after it */
567 post_int += ((double)current_time_usec)/1000000.0; /* adjust usecs */
568 } else {
569 pre_int = interval;
570 post_int = 0;
571 }
573 #ifdef DEBUG
574 printf(
575 "proc_pdp_age %lu\t"
576 "proc_pdp_st %lu\t"
577 "occu_pfp_age %lu\t"
578 "occu_pdp_st %lu\t"
579 "int %lf\t"
580 "pre_int %lf\t"
581 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
582 occu_pdp_age, occu_pdp_st,
583 interval, pre_int, post_int);
584 #endif
586 /* process the data sources and update the pdp_prep
587 * area accordingly */
588 for(i=0;i<rrd.stat_head->ds_cnt;i++){
589 enum dst_en dst_idx;
590 dst_idx= dst_conv(rrd.ds_def[i].dst);
592 /* make sure we do not build diffs with old last_ds values */
593 if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
594 strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
595 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
596 }
598 /* NOTE: DST_CDEF should never enter this if block, because
599 * updvals[i+1][0] is initialized to 'U'; unless the caller
600 * accidently specified a value for the DST_CDEF. To handle
601 * this case, an extra check is required. */
603 if((updvals[i+1][0] != 'U') &&
604 (dst_idx != DST_CDEF) &&
605 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
606 double rate = DNAN;
607 /* the data source type defines how to process the data */
608 /* pdp_new contains rate * time ... eg the bytes
609 * transferred during the interval. Doing it this way saves
610 * a lot of math operations */
613 switch(dst_idx){
614 case DST_COUNTER:
615 case DST_DERIVE:
616 if(rrd.pdp_prep[i].last_ds[0] != 'U'){
617 for(ii=0;updvals[i+1][ii] != '\0';ii++){
618 if(updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9' || (ii==0 && updvals[i+1][ii] == '-')){
619 rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
620 break;
621 }
622 }
623 if (rrd_test_error()){
624 break;
625 }
626 pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
627 if(dst_idx == DST_COUNTER) {
628 /* simple overflow catcher suggested by Andres Kroonmaa */
629 /* this will fail terribly for non 32 or 64 bit counters ... */
630 /* are there any others in SNMP land ? */
631 if (pdp_new[i] < (double)0.0 )
632 pdp_new[i] += (double)4294967296.0 ; /* 2^32 */
633 if (pdp_new[i] < (double)0.0 )
634 pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
635 }
636 rate = pdp_new[i] / interval;
637 }
638 else {
639 pdp_new[i]= DNAN;
640 }
641 break;
642 case DST_ABSOLUTE:
643 errno = 0;
644 pdp_new[i] = strtod(updvals[i+1],&endptr);
645 if (errno > 0){
646 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
647 break;
648 };
649 if (endptr[0] != '\0'){
650 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
651 break;
652 }
653 rate = pdp_new[i] / interval;
654 break;
655 case DST_GAUGE:
656 errno = 0;
657 pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
658 if (errno > 0){
659 rrd_set_error("converting '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
660 break;
661 };
662 if (endptr[0] != '\0'){
663 rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
664 break;
665 }
666 rate = pdp_new[i] / interval;
667 break;
668 default:
669 rrd_set_error("rrd contains unknown DS type : '%s'",
670 rrd.ds_def[i].dst);
671 break;
672 }
673 /* break out of this for loop if the error string is set */
674 if (rrd_test_error()){
675 break;
676 }
677 /* make sure pdp_temp is neither too large or too small
678 * if any of these occur it becomes unknown ...
679 * sorry folks ... */
680 if ( ! isnan(rate) &&
681 (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
682 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
683 ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
684 rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
685 pdp_new[i] = DNAN;
686 }
687 } else {
688 /* no news is news all the same */
689 pdp_new[i] = DNAN;
690 }
693 /* make a copy of the command line argument for the next run */
694 #ifdef DEBUG
695 fprintf(stderr,
696 "prep ds[%lu]\t"
697 "last_arg '%s'\t"
698 "this_arg '%s'\t"
699 "pdp_new %10.2f\n",
700 i,
701 rrd.pdp_prep[i].last_ds,
702 updvals[i+1], pdp_new[i]);
703 #endif
704 strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
705 rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
706 }
707 /* break out of the argument parsing loop if the error_string is set */
708 if (rrd_test_error()){
709 free(step_start);
710 break;
711 }
712 /* has a pdp_st moment occurred since the last run ? */
714 if (proc_pdp_st == occu_pdp_st){
715 /* no we have not passed a pdp_st moment. therefore update is simple */
717 for(i=0;i<rrd.stat_head->ds_cnt;i++){
718 if(isnan(pdp_new[i])) {
719 /* this is not realy accurate if we use subsecond data arival time
720 should have thought of it when going subsecond resolution ...
721 sorry next format change we will have it! */
722 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
723 } else {
724 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
725 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
726 } else {
727 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
728 }
729 }
730 #ifdef DEBUG
731 fprintf(stderr,
732 "NO PDP ds[%lu]\t"
733 "value %10.2f\t"
734 "unkn_sec %5lu\n",
735 i,
736 rrd.pdp_prep[i].scratch[PDP_val].u_val,
737 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
738 #endif
739 }
740 } else {
741 /* an pdp_st has occurred. */
743 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
744 * occurred up to the last run.
745 pdp_new[] contains rate*seconds from the latest run.
746 pdp_temp[] will contain the rate for cdp */
748 for(i=0;i<rrd.stat_head->ds_cnt;i++){
749 /* update pdp_prep to the current pdp_st. */
750 double pre_unknown = 0.0;
751 if(isnan(pdp_new[i]))
752 /* a final bit of unkonwn to be added bevore calculation
753 * we use a tempaorary variable for this so that we
754 * don't have to turn integer lines before using the value */
755 pre_unknown = pre_int;
756 else {
757 if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
758 rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i]/interval*pre_int;
759 } else {
760 rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
761 }
762 }
765 /* if too much of the pdp_prep is unknown we dump it */
766 if (
767 /* removed because this does not agree with the definition
768 a heart beat can be unknown */
769 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
770 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
771 /* if the interval is larger thatn mrhb we get NAN */
772 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
773 (occu_pdp_st-proc_pdp_st <=
774 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
775 pdp_temp[i] = DNAN;
776 } else {
777 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
778 / ((double)(occu_pdp_st - proc_pdp_st
779 - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
780 -pre_unknown);
781 }
783 /* process CDEF data sources; remember each CDEF DS can
784 * only reference other DS with a lower index number */
785 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
786 rpnp_t *rpnp;
787 rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
788 /* substitue data values for OP_VARIABLE nodes */
789 for (ii = 0; rpnp[ii].op != OP_END; ii++)
790 {
791 if (rpnp[ii].op == OP_VARIABLE) {
792 rpnp[ii].op = OP_NUMBER;
793 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
794 }
795 }
796 /* run the rpn calculator */
797 if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
798 free(rpnp);
799 break; /* exits the data sources pdp_temp loop */
800 }
801 }
803 /* make pdp_prep ready for the next run */
804 if(isnan(pdp_new[i])){
805 /* this is not realy accurate if we use subsecond data arival time
806 should have thought of it when going subsecond resolution ...
807 sorry next format change we will have it! */
808 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
809 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
810 } else {
811 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
812 rrd.pdp_prep[i].scratch[PDP_val].u_val =
813 pdp_new[i]/interval*post_int;
814 }
816 #ifdef DEBUG
817 fprintf(stderr,
818 "PDP UPD ds[%lu]\t"
819 "pdp_temp %10.2f\t"
820 "new_prep %10.2f\t"
821 "new_unkn_sec %5lu\n",
822 i, pdp_temp[i],
823 rrd.pdp_prep[i].scratch[PDP_val].u_val,
824 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
825 #endif
826 }
828 /* if there were errors during the last loop, bail out here */
829 if (rrd_test_error()){
830 free(step_start);
831 break;
832 }
834 /* compute the number of elapsed pdp_st moments */
835 elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
836 #ifdef DEBUG
837 fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
838 #endif
839 if (rra_step_cnt == NULL)
840 {
841 rra_step_cnt = (unsigned long *)
842 malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
843 }
845 for(i = 0, rra_start = rra_begin;
846 i < rrd.stat_head->rra_cnt;
847 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
848 i++)
849 {
850 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
851 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
852 (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
853 if (start_pdp_offset <= elapsed_pdp_st) {
854 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
855 rrd.rra_def[i].pdp_cnt + 1;
856 } else {
857 rra_step_cnt[i] = 0;
858 }
860 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
861 {
862 /* If this is a bulk update, we need to skip ahead in the seasonal
863 * arrays so that they will be correct for the next observed value;
864 * note that for the bulk update itself, no update will occur to
865 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
866 * be set to DNAN. */
867 if (rra_step_cnt[i] > 2)
868 {
869 /* skip update by resetting rra_step_cnt[i],
870 * note that this is not data source specific; this is due
871 * to the bulk update, not a DNAN value for the specific data
872 * source. */
873 rra_step_cnt[i] = 0;
874 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
875 &last_seasonal_coef);
876 lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
877 &seasonal_coef);
878 }
880 /* periodically run a smoother for seasonal effects */
881 /* Need to use first cdp parameter buffer to track
882 * burnin (burnin requires a specific smoothing schedule).
883 * The CDP_init_seasonal parameter is really an RRA level,
884 * not a data source within RRA level parameter, but the rra_def
885 * is read only for rrd_update (not flushed to disk). */
886 iii = i*(rrd.stat_head -> ds_cnt);
887 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
888 <= BURNIN_CYCLES)
889 {
890 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
891 > rrd.rra_def[i].row_cnt - 1) {
892 /* mark off one of the burnin cycles */
893 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
894 schedule_smooth = 1;
895 }
896 } else {
897 /* someone has no doubt invented a trick to deal with this
898 * wrap around, but at least this code is clear. */
899 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
900 rrd.rra_ptr[i].cur_row)
901 {
902 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
903 * mapping between PDP and CDP */
904 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
905 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
906 {
907 #ifdef DEBUG
908 fprintf(stderr,
909 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
910 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
911 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
912 #endif
913 schedule_smooth = 1;
914 }
915 } else {
916 /* can't rely on negative numbers because we are working with
917 * unsigned values */
918 /* Don't need modulus here. If we've wrapped more than once, only
919 * one smooth is executed at the end. */
920 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
921 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
922 >= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
923 {
924 #ifdef DEBUG
925 fprintf(stderr,
926 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
927 rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
928 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
929 #endif
930 schedule_smooth = 1;
931 }
932 }
933 }
935 rra_current = ftell(rrd_file);
936 } /* if cf is DEVSEASONAL or SEASONAL */
938 if (rrd_test_error()) break;
940 /* update CDP_PREP areas */
941 /* loop over data soures within each RRA */
942 for(ii = 0;
943 ii < rrd.stat_head->ds_cnt;
944 ii++)
945 {
947 /* iii indexes the CDP prep area for this data source within the RRA */
948 iii=i*rrd.stat_head->ds_cnt+ii;
950 if (rrd.rra_def[i].pdp_cnt > 1) {
952 if (rra_step_cnt[i] > 0) {
953 /* If we are in this block, as least 1 CDP value will be written to
954 * disk, this is the CDP_primary_val entry. If more than 1 value needs
955 * to be written, then the "fill in" value is the CDP_secondary_val
956 * entry. */
957 if (isnan(pdp_temp[ii]))
958 {
959 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
960 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
961 } else {
962 /* CDP_secondary value is the RRA "fill in" value for intermediary
963 * CDP data entries. No matter the CF, the value is the same because
964 * the average, max, min, and last of a list of identical values is
965 * the same, namely, the value itself. */
966 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
967 }
969 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
970 > rrd.rra_def[i].pdp_cnt*
971 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
972 {
973 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
974 /* initialize carry over */
975 if (current_cf == CF_AVERAGE) {
976 if (isnan(pdp_temp[ii])) {
977 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
978 } else {
979 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
980 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
981 }
982 } else {
983 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
984 }
985 } else {
986 rrd_value_t cum_val, cur_val;
987 switch (current_cf) {
988 case CF_AVERAGE:
989 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
990 cur_val = IFDNAN(pdp_temp[ii],0.0);
991 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
992 (cum_val + cur_val * start_pdp_offset) /
993 (rrd.rra_def[i].pdp_cnt
994 -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
995 /* initialize carry over value */
996 if (isnan(pdp_temp[ii])) {
997 rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
998 } else {
999 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1000 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1001 }
1002 break;
1003 case CF_MAXIMUM:
1004 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1005 cur_val = IFDNAN(pdp_temp[ii],-DINF);
1006 #ifdef DEBUG
1007 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1008 isnan(pdp_temp[ii])) {
1009 fprintf(stderr,
1010 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1011 i,ii);
1012 exit(-1);
1013 }
1014 #endif
1015 if (cur_val > cum_val)
1016 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1017 else
1018 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1019 /* initialize carry over value */
1020 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1021 break;
1022 case CF_MINIMUM:
1023 cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1024 cur_val = IFDNAN(pdp_temp[ii],DINF);
1025 #ifdef DEBUG
1026 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1027 isnan(pdp_temp[ii])) {
1028 fprintf(stderr,
1029 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1030 i,ii);
1031 exit(-1);
1032 }
1033 #endif
1034 if (cur_val < cum_val)
1035 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1036 else
1037 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1038 /* initialize carry over value */
1039 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1040 break;
1041 case CF_LAST:
1042 default:
1043 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1044 /* initialize carry over value */
1045 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1046 break;
1047 }
1048 } /* endif meets xff value requirement for a valid value */
1049 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1050 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1051 if (isnan(pdp_temp[ii]))
1052 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1053 (elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1054 else
1055 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1056 } else /* rra_step_cnt[i] == 0 */
1057 {
1058 #ifdef DEBUG
1059 if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1060 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1061 i,ii);
1062 } else {
1063 fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1064 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1065 }
1066 #endif
1067 if (isnan(pdp_temp[ii])) {
1068 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1069 } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1070 {
1071 if (current_cf == CF_AVERAGE) {
1072 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1073 elapsed_pdp_st;
1074 } else {
1075 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1076 }
1077 #ifdef DEBUG
1078 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1079 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1080 #endif
1081 } else {
1082 switch (current_cf) {
1083 case CF_AVERAGE:
1084 rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1085 elapsed_pdp_st;
1086 break;
1087 case CF_MINIMUM:
1088 if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1089 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1090 break;
1091 case CF_MAXIMUM:
1092 if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1093 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1094 break;
1095 case CF_LAST:
1096 default:
1097 rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1098 break;
1099 }
1100 }
1101 }
1102 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1103 if (elapsed_pdp_st > 2)
1104 {
1105 switch (current_cf) {
1106 case CF_AVERAGE:
1107 default:
1108 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1109 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1110 break;
1111 case CF_SEASONAL:
1112 case CF_DEVSEASONAL:
1113 /* need to update cached seasonal values, so they are consistent
1114 * with the bulk update */
1115 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1116 * CDP_last_deviation are the same. */
1117 rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1118 last_seasonal_coef[ii];
1119 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1120 seasonal_coef[ii];
1121 break;
1122 case CF_HWPREDICT:
1123 /* need to update the null_count and last_null_count.
1124 * even do this for non-DNAN pdp_temp because the
1125 * algorithm is not learning from batch updates. */
1126 rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1127 elapsed_pdp_st;
1128 rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1129 elapsed_pdp_st - 1;
1130 /* fall through */
1131 case CF_DEVPREDICT:
1132 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1133 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1134 break;
1135 case CF_FAILURES:
1136 /* do not count missed bulk values as failures */
1137 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1138 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1139 /* need to reset violations buffer.
1140 * could do this more carefully, but for now, just
1141 * assume a bulk update wipes away all violations. */
1142 erase_violations(&rrd, iii, i);
1143 break;
1144 }
1145 }
1146 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1148 if (rrd_test_error()) break;
1150 } /* endif data sources loop */
1151 } /* end RRA Loop */
1153 /* this loop is only entered if elapsed_pdp_st < 3 */
1154 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1155 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1156 {
1157 for(i = 0, rra_start = rra_begin;
1158 i < rrd.stat_head->rra_cnt;
1159 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1160 i++)
1161 {
1162 if (rrd.rra_def[i].pdp_cnt > 1) continue;
1164 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1165 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1166 {
1167 lookup_seasonal(&rrd,i,rra_start,rrd_file,
1168 elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1169 &seasonal_coef);
1170 rra_current = ftell(rrd_file);
1171 }
1172 if (rrd_test_error()) break;
1173 /* loop over data soures within each RRA */
1174 for(ii = 0;
1175 ii < rrd.stat_head->ds_cnt;
1176 ii++)
1177 {
1178 update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1179 i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1180 scratch_idx, seasonal_coef);
1181 }
1182 } /* end RRA Loop */
1183 if (rrd_test_error()) break;
1184 } /* end elapsed_pdp_st loop */
1186 if (rrd_test_error()) break;
1188 /* Ready to write to disk */
1189 /* Move sequentially through the file, writing one RRA at a time.
1190 * Note this architecture divorces the computation of CDP with
1191 * flushing updated RRA entries to disk. */
1192 for(i = 0, rra_start = rra_begin;
1193 i < rrd.stat_head->rra_cnt;
1194 rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1195 i++) {
1196 /* is there anything to write for this RRA? If not, continue. */
1197 if (rra_step_cnt[i] == 0) continue;
1199 /* write the first row */
1200 #ifdef DEBUG
1201 fprintf(stderr," -- RRA Preseek %ld\n",ftell(rrd_file));
1202 #endif
1203 rrd.rra_ptr[i].cur_row++;
1204 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1205 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1206 /* positition on the first row */
1207 rra_pos_tmp = rra_start +
1208 (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1209 if(rra_pos_tmp != rra_current) {
1210 #ifndef HAVE_MMAP
1211 if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1212 rrd_set_error("seek error in rrd");
1213 break;
1214 }
1215 #endif
1216 rra_current = rra_pos_tmp;
1217 }
1219 #ifdef DEBUG
1220 fprintf(stderr," -- RRA Postseek %ld\n",ftell(rrd_file));
1221 #endif
1222 scratch_idx = CDP_primary_val;
1223 if (pcdp_summary != NULL)
1224 {
1225 rra_time = (current_time - current_time
1226 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1227 - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1228 }
1229 #ifdef HAVE_MMAP
1230 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1231 pcdp_summary, &rra_time, rrd_mmaped_file);
1232 #else
1233 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1234 pcdp_summary, &rra_time);
1235 #endif
1236 if (rrd_test_error()) break;
1238 /* write other rows of the bulk update, if any */
1239 scratch_idx = CDP_secondary_val;
1240 for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1241 {
1242 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1243 {
1244 #ifdef DEBUG
1245 fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1246 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1247 #endif
1248 /* wrap */
1249 rrd.rra_ptr[i].cur_row = 0;
1250 /* seek back to beginning of current rra */
1251 if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1252 {
1253 rrd_set_error("seek error in rrd");
1254 break;
1255 }
1256 #ifdef DEBUG
1257 fprintf(stderr," -- Wraparound Postseek %ld\n",ftell(rrd_file));
1258 #endif
1259 rra_current = rra_start;
1260 }
1261 if (pcdp_summary != NULL)
1262 {
1263 rra_time = (current_time - current_time
1264 % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1265 - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1266 }
1267 #ifdef HAVE_MMAP
1268 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1269 pcdp_summary, &rra_time, rrd_mmaped_file);
1270 #else
1271 pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1272 pcdp_summary, &rra_time);
1273 #endif
1274 }
1276 if (rrd_test_error())
1277 break;
1278 } /* RRA LOOP */
1280 /* break out of the argument parsing loop if error_string is set */
1281 if (rrd_test_error()){
1282 free(step_start);
1283 break;
1284 }
1286 } /* endif a pdp_st has occurred */
1287 rrd.live_head->last_up = current_time;
1288 rrd.live_head->last_up_usec = current_time_usec;
1289 free(step_start);
1290 } /* function argument loop */
1292 if (seasonal_coef != NULL) free(seasonal_coef);
1293 if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1294 if (rra_step_cnt != NULL) free(rra_step_cnt);
1295 rpnstack_free(&rpnstack);
1297 #ifdef HAVE_MMAP
1298 if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1299 rrd_set_error("error writing(unmapping) file: %s", filename);
1300 }
1301 #endif
1302 /* if we got here and if there is an error and if the file has not been
1303 * written to, then close things up and return. */
1304 if (rrd_test_error()) {
1305 free(updvals);
1306 free(tmpl_idx);
1307 rrd_free(&rrd);
1308 free(pdp_temp);
1309 free(pdp_new);
1310 fclose(rrd_file);
1311 return(-1);
1312 }
1314 /* aargh ... that was tough ... so many loops ... anyway, its done.
1315 * we just need to write back the live header portion now*/
1317 if (fseek(rrd_file, (sizeof(stat_head_t)
1318 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1319 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1320 SEEK_SET) != 0) {
1321 rrd_set_error("seek rrd for live header writeback");
1322 free(updvals);
1323 free(tmpl_idx);
1324 rrd_free(&rrd);
1325 free(pdp_temp);
1326 free(pdp_new);
1327 fclose(rrd_file);
1328 return(-1);
1329 }
1331 if(version >= 3) {
1332 if(fwrite( rrd.live_head,
1333 sizeof(live_head_t), 1, rrd_file) != 1){
1334 rrd_set_error("fwrite live_head to rrd");
1335 free(updvals);
1336 rrd_free(&rrd);
1337 free(tmpl_idx);
1338 free(pdp_temp);
1339 free(pdp_new);
1340 fclose(rrd_file);
1341 return(-1);
1342 }
1343 }
1344 else {
1345 if(fwrite( &rrd.live_head->last_up,
1346 sizeof(time_t), 1, rrd_file) != 1){
1347 rrd_set_error("fwrite live_head to rrd");
1348 free(updvals);
1349 rrd_free(&rrd);
1350 free(tmpl_idx);
1351 free(pdp_temp);
1352 free(pdp_new);
1353 fclose(rrd_file);
1354 return(-1);
1355 }
1356 }
1359 if(fwrite( rrd.pdp_prep,
1360 sizeof(pdp_prep_t),
1361 rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1362 rrd_set_error("ftwrite pdp_prep to rrd");
1363 free(updvals);
1364 rrd_free(&rrd);
1365 free(tmpl_idx);
1366 free(pdp_temp);
1367 free(pdp_new);
1368 fclose(rrd_file);
1369 return(-1);
1370 }
1372 if(fwrite( rrd.cdp_prep,
1373 sizeof(cdp_prep_t),
1374 rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1375 != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1377 rrd_set_error("ftwrite cdp_prep to rrd");
1378 free(updvals);
1379 free(tmpl_idx);
1380 rrd_free(&rrd);
1381 free(pdp_temp);
1382 free(pdp_new);
1383 fclose(rrd_file);
1384 return(-1);
1385 }
1387 if(fwrite( rrd.rra_ptr,
1388 sizeof(rra_ptr_t),
1389 rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1390 rrd_set_error("fwrite rra_ptr to rrd");
1391 free(updvals);
1392 free(tmpl_idx);
1393 rrd_free(&rrd);
1394 free(pdp_temp);
1395 free(pdp_new);
1396 fclose(rrd_file);
1397 return(-1);
1398 }
1400 /* OK now close the files and free the memory */
1401 if(fclose(rrd_file) != 0){
1402 rrd_set_error("closing rrd");
1403 free(updvals);
1404 free(tmpl_idx);
1405 rrd_free(&rrd);
1406 free(pdp_temp);
1407 free(pdp_new);
1408 return(-1);
1409 }
1411 /* calling the smoothing code here guarantees at most
1412 * one smoothing operation per rrd_update call. Unfortunately,
1413 * it is possible with bulk updates, or a long-delayed update
1414 * for smoothing to occur off-schedule. This really isn't
1415 * critical except during the burning cycles. */
1416 if (schedule_smooth)
1417 {
1418 rrd_file = fopen(filename,"rb+");
1419 rra_start = rra_begin;
1420 for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1421 {
1422 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1423 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1424 {
1425 #ifdef DEBUG
1426 fprintf(stderr,"Running smoother for rra %ld\n",i);
1427 #endif
1428 apply_smoother(&rrd,i,rra_start,rrd_file);
1429 if (rrd_test_error())
1430 break;
1431 }
1432 rra_start += rrd.rra_def[i].row_cnt
1433 *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1434 }
1435 fclose(rrd_file);
1436 }
1437 rrd_free(&rrd);
1438 free(updvals);
1439 free(tmpl_idx);
1440 free(pdp_new);
1441 free(pdp_temp);
1442 return(0);
1443 }
1445 /*
1446 * get exclusive lock to whole file.
1447 * lock gets removed when we close the file
1448 *
1449 * returns 0 on success
1450 */
1451 int
1452 LockRRD(FILE *rrdfile)
1453 {
1454 int rrd_fd; /* File descriptor for RRD */
1455 int rcstat;
1457 rrd_fd = fileno(rrdfile);
1459 {
1460 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1461 struct _stat st;
1463 if ( _fstat( rrd_fd, &st ) == 0 ) {
1464 rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1465 } else {
1466 rcstat = -1;
1467 }
1468 #else
1469 struct flock lock;
1470 lock.l_type = F_WRLCK; /* exclusive write lock */
1471 lock.l_len = 0; /* whole file */
1472 lock.l_start = 0; /* start of file */
1473 lock.l_whence = SEEK_SET; /* end of file */
1475 rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1476 #endif
1477 }
1479 return(rcstat);
1480 }
1483 #ifdef HAVE_MMAP
1484 info_t
1485 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1486 unsigned short CDP_scratch_idx,
1487 #ifndef DEBUG
1488 FILE UNUSED(*rrd_file),
1489 #else
1490 FILE *rrd_file,
1491 #endif
1492 info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1493 #else
1494 info_t
1495 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1496 unsigned short CDP_scratch_idx, FILE *rrd_file,
1497 info_t *pcdp_summary, time_t *rra_time)
1498 #endif
1499 {
1500 unsigned long ds_idx, cdp_idx;
1501 infoval iv;
1503 for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1504 {
1505 /* compute the cdp index */
1506 cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1507 #ifdef DEBUG
1508 fprintf(stderr," -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1509 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1510 rrd -> rra_def[rra_idx].cf_nam);
1511 #endif
1512 if (pcdp_summary != NULL)
1513 {
1514 iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1515 /* append info to the return hash */
1516 pcdp_summary = info_push(pcdp_summary,
1517 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1518 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1519 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1520 RD_I_VAL, iv);
1521 }
1522 #ifdef HAVE_MMAP
1523 memcpy((char *)rrd_mmaped_file + *rra_current,
1524 &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1525 sizeof(rrd_value_t));
1526 #else
1527 if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1528 sizeof(rrd_value_t),1,rrd_file) != 1)
1529 {
1530 rrd_set_error("writing rrd");
1531 return 0;
1532 }
1533 #endif
1534 *rra_current += sizeof(rrd_value_t);
1535 }
1536 return (pcdp_summary);
1537 }