1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
63 /*
64 * normilize time as returned by gettimeofday. usec part must
65 * be always >= 0
66 */
67 static void normalize_time(
68 struct timeval *t)
69 {
70 if (t->tv_usec < 0) {
71 t->tv_sec--;
72 t->tv_usec += 1000000L;
73 }
74 }
76 /* Local prototypes */
77 int LockRRD(
78 int in_file);
80 #ifdef HAVE_MMAP
81 info_t *write_RRA_row(
82 rrd_t *rrd,
83 unsigned long rra_idx,
84 unsigned long *rra_current,
85 unsigned short CDP_scratch_idx,
86 #ifndef DEBUG
87 int UNUSED(in_file),
88 #else
89 int in_file,
90 #endif
91 info_t *pcdp_summary,
92 time_t *rra_time,
93 void *rrd_mmaped_file);
94 #else
95 info_t *write_RRA_row(
96 rrd_t *rrd,
97 unsigned long rra_idx,
98 unsigned long *rra_current,
99 unsigned short CDP_scratch_idx,
100 int in_file,
101 info_t *pcdp_summary,
102 time_t *rra_time);
103 #endif
104 int rrd_update_r(
105 const char *filename,
106 const char *tmplt,
107 int argc,
108 const char **argv);
109 int _rrd_update(
110 const char *filename,
111 const char *tmplt,
112 int argc,
113 const char **argv,
114 info_t *);
116 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
119 info_t *rrd_update_v(
120 int argc,
121 char **argv)
122 {
123 char *tmplt = NULL;
124 info_t *result = NULL;
125 infoval rc;
127 rc.u_int = -1;
128 optind = 0;
129 opterr = 0; /* initialize getopt */
131 while (1) {
132 static struct option long_options[] = {
133 {"template", required_argument, 0, 't'},
134 {0, 0, 0, 0}
135 };
136 int option_index = 0;
137 int opt;
139 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
141 if (opt == EOF)
142 break;
144 switch (opt) {
145 case 't':
146 tmplt = optarg;
147 break;
149 case '?':
150 rrd_set_error("unknown option '%s'", argv[optind - 1]);
151 goto end_tag;
152 }
153 }
155 /* need at least 2 arguments: filename, data. */
156 if (argc - optind < 2) {
157 rrd_set_error("Not enough arguments");
158 goto end_tag;
159 }
160 rc.u_int = 0;
161 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
162 rc.u_int = _rrd_update(argv[optind], tmplt,
163 argc - optind - 1,
164 (const char **) (argv + optind + 1), result);
165 result->value.u_int = rc.u_int;
166 end_tag:
167 return result;
168 }
170 int rrd_update(
171 int argc,
172 char **argv)
173 {
174 char *tmplt = NULL;
175 int rc;
177 optind = 0;
178 opterr = 0; /* initialize getopt */
180 while (1) {
181 static struct option long_options[] = {
182 {"template", required_argument, 0, 't'},
183 {0, 0, 0, 0}
184 };
185 int option_index = 0;
186 int opt;
188 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
190 if (opt == EOF)
191 break;
193 switch (opt) {
194 case 't':
195 tmplt = optarg;
196 break;
198 case '?':
199 rrd_set_error("unknown option '%s'", argv[optind - 1]);
200 return (-1);
201 }
202 }
204 /* need at least 2 arguments: filename, data. */
205 if (argc - optind < 2) {
206 rrd_set_error("Not enough arguments");
208 return -1;
209 }
211 rc = rrd_update_r(argv[optind], tmplt,
212 argc - optind - 1, (const char **) (argv + optind + 1));
213 return rc;
214 }
216 int rrd_update_r(
217 const char *filename,
218 const char *tmplt,
219 int argc,
220 const char **argv)
221 {
222 return _rrd_update(filename, tmplt, argc, argv, NULL);
223 }
225 int _rrd_update(
226 const char *filename,
227 const char *tmplt,
228 int argc,
229 const char **argv,
230 info_t *pcdp_summary)
231 {
233 int arg_i = 2;
234 short j;
235 unsigned long i, ii, iii = 1;
237 unsigned long rra_begin; /* byte pointer to the rra
238 * area in the rrd file. this
239 * pointer never changes value */
240 unsigned long rra_start; /* byte pointer to the rra
241 * area in the rrd file. this
242 * pointer changes as each rrd is
243 * processed. */
244 unsigned long rra_current; /* byte pointer to the current write
245 * spot in the rrd file. */
246 unsigned long rra_pos_tmp; /* temporary byte pointer. */
247 double interval, pre_int, post_int; /* interval between this and
248 * the last run */
249 unsigned long proc_pdp_st; /* which pdp_st was the last
250 * to be processed */
251 unsigned long occu_pdp_st; /* when was the pdp_st
252 * before the last update
253 * time */
254 unsigned long proc_pdp_age; /* how old was the data in
255 * the pdp prep area when it
256 * was last updated */
257 unsigned long occu_pdp_age; /* how long ago was the last
258 * pdp_step time */
259 rrd_value_t *pdp_new; /* prepare the incoming data
260 * to be added the the
261 * existing entry */
262 rrd_value_t *pdp_temp; /* prepare the pdp values
263 * to be added the the
264 * cdp values */
266 long *tmpl_idx; /* index representing the settings
267 transported by the tmplt index */
268 unsigned long tmpl_cnt = 2; /* time and data */
270 rrd_t rrd;
271 time_t current_time = 0;
272 time_t rra_time = 0; /* time of update for a RRA */
273 unsigned long current_time_usec = 0; /* microseconds part of current time */
274 struct timeval tmp_time; /* used for time conversion */
276 char **updvals;
277 int schedule_smooth = 0;
278 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
280 /* a vector of future Holt-Winters seasonal coefs */
281 unsigned long elapsed_pdp_st;
283 /* number of elapsed PDP steps since last update */
284 unsigned long *rra_step_cnt = NULL;
286 /* number of rows to be updated in an RRA for a data
287 * value. */
288 unsigned long start_pdp_offset;
290 /* number of PDP steps since the last update that
291 * are assigned to the first CDP to be generated
292 * since the last update. */
293 unsigned short scratch_idx;
295 /* index into the CDP scratch array */
296 enum cf_en current_cf;
298 /* numeric id of the current consolidation function */
299 rpnstack_t rpnstack; /* used for COMPUTE DS */
300 int version; /* rrd version */
301 char *endptr; /* used in the conversion */
302 rrd_file_t *rrd_file;
304 rpnstack_init(&rpnstack);
306 /* need at least 1 arguments: data. */
307 if (argc < 1) {
308 rrd_set_error("Not enough arguments");
309 return -1;
310 }
312 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
313 if (rrd_file == NULL) {
314 return -1;
315 }
317 /* initialize time */
318 version = atoi(rrd.stat_head->version);
319 gettimeofday(&tmp_time, 0);
320 normalize_time(&tmp_time);
321 current_time = tmp_time.tv_sec;
322 if (version >= 3) {
323 current_time_usec = tmp_time.tv_usec;
324 } else {
325 current_time_usec = 0;
326 }
328 rra_current = rra_start = rra_begin = rrd_file->header_len;
329 /* This is defined in the ANSI C standard, section 7.9.5.3:
331 When a file is opened with udpate mode ('+' as the second
332 or third character in the ... list of mode argument
333 variables), both input and output may be performed on the
334 associated stream. However, ... input may not be directly
335 followed by output without an intervening call to a file
336 positioning function, unless the input operation encounters
337 end-of-file. */
338 #if 0 //def HAVE_MMAP
339 rrd_filesize = rrd_file->file_size;
340 fseek(rrd_file->fd, 0, SEEK_END);
341 rrd_filesize = ftell(rrd_file->fd);
342 fseek(rrd_file->fd, rra_current, SEEK_SET);
343 #else
344 // fseek(rrd_file->fd, 0, SEEK_CUR);
345 #endif
348 /* get exclusive lock to whole file.
349 * lock gets removed when we close the file.
350 */
351 if (LockRRD(rrd_file->fd) != 0) {
352 rrd_set_error("could not lock RRD");
353 rrd_free(&rrd);
354 close(rrd_file->fd);
355 return (-1);
356 }
358 if ((updvals =
359 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
360 rrd_set_error("allocating updvals pointer array");
361 rrd_free(&rrd);
362 close(rrd_file->fd);
363 return (-1);
364 }
366 if ((pdp_temp = malloc(sizeof(rrd_value_t)
367 * rrd.stat_head->ds_cnt)) == NULL) {
368 rrd_set_error("allocating pdp_temp ...");
369 free(updvals);
370 rrd_free(&rrd);
371 close(rrd_file->fd);
372 return (-1);
373 }
375 if ((tmpl_idx = malloc(sizeof(unsigned long)
376 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
377 rrd_set_error("allocating tmpl_idx ...");
378 free(pdp_temp);
379 free(updvals);
380 rrd_free(&rrd);
381 close(rrd_file->fd);
382 return (-1);
383 }
384 /* initialize tmplt redirector */
385 /* default config example (assume DS 1 is a CDEF DS)
386 tmpl_idx[0] -> 0; (time)
387 tmpl_idx[1] -> 1; (DS 0)
388 tmpl_idx[2] -> 3; (DS 2)
389 tmpl_idx[3] -> 4; (DS 3) */
390 tmpl_idx[0] = 0; /* time */
391 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
392 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
393 tmpl_idx[ii++] = i;
394 }
395 tmpl_cnt = ii;
397 if (tmplt) {
398 /* we should work on a writeable copy here */
399 char *dsname;
400 unsigned int tmpl_len;
401 char *tmplt_copy = strdup(tmplt);
403 dsname = tmplt_copy;
404 tmpl_cnt = 1; /* the first entry is the time */
405 tmpl_len = strlen(tmplt_copy);
406 for (i = 0; i <= tmpl_len; i++) {
407 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
408 tmplt_copy[i] = '\0';
409 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
410 rrd_set_error
411 ("tmplt contains more DS definitions than RRD");
412 free(updvals);
413 free(pdp_temp);
414 free(tmpl_idx);
415 rrd_free(&rrd);
416 close(rrd_file->fd);
417 return (-1);
418 }
419 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
420 rrd_set_error("unknown DS name '%s'", dsname);
421 free(updvals);
422 free(pdp_temp);
423 free(tmplt_copy);
424 free(tmpl_idx);
425 rrd_free(&rrd);
426 close(rrd_file->fd);
427 return (-1);
428 } else {
429 /* the first element is always the time */
430 tmpl_idx[tmpl_cnt - 1]++;
431 /* go to the next entry on the tmplt_copy */
432 dsname = &tmplt_copy[i + 1];
433 /* fix the damage we did before */
434 if (i < tmpl_len) {
435 tmplt_copy[i] = ':';
436 }
438 }
439 }
440 }
441 free(tmplt_copy);
442 }
443 if ((pdp_new = malloc(sizeof(rrd_value_t)
444 * rrd.stat_head->ds_cnt)) == NULL) {
445 rrd_set_error("allocating pdp_new ...");
446 free(updvals);
447 free(pdp_temp);
448 free(tmpl_idx);
449 rrd_free(&rrd);
450 close(rrd_file->fd);
451 return (-1);
452 }
453 #if 0 //def HAVE_MMAP
454 rrd_mmaped_file = mmap(0,
455 rrd_file->file_len,
456 PROT_READ | PROT_WRITE,
457 MAP_SHARED, fileno(in_file), 0);
458 if (rrd_mmaped_file == MAP_FAILED) {
459 rrd_set_error("error mmapping file %s", filename);
460 free(updvals);
461 free(pdp_temp);
462 free(tmpl_idx);
463 rrd_free(&rrd);
464 close(rrd_file->fd);
465 return (-1);
466 }
467 #ifdef USE_MADVISE
468 /* when we use mmaping we tell the kernel the mmap equivalent
469 of POSIX_FADV_RANDOM */
470 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
471 #endif
472 #endif
473 /* loop through the arguments. */
474 for (arg_i = 0; arg_i < argc; arg_i++) {
475 char *stepper = strdup(argv[arg_i]);
476 char *step_start = stepper;
477 char *p;
478 char *parsetime_error = NULL;
479 enum { atstyle, normal } timesyntax;
480 struct rrd_time_value ds_tv;
482 if (stepper == NULL) {
483 rrd_set_error("failed duplication argv entry");
484 free(step_start);
485 free(updvals);
486 free(pdp_temp);
487 free(tmpl_idx);
488 rrd_free(&rrd);
489 #ifdef HAVE_MMAP
490 rrd_close(rrd_file);
491 #endif
492 close(rrd_file->fd);
493 return (-1);
494 }
495 /* initialize all ds input to unknown except the first one
496 which has always got to be set */
497 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
498 updvals[ii] = "U";
499 updvals[0] = stepper;
500 /* separate all ds elements; first must be examined separately
501 due to alternate time syntax */
502 if ((p = strchr(stepper, '@')) != NULL) {
503 timesyntax = atstyle;
504 *p = '\0';
505 stepper = p + 1;
506 } else if ((p = strchr(stepper, ':')) != NULL) {
507 timesyntax = normal;
508 *p = '\0';
509 stepper = p + 1;
510 } else {
511 rrd_set_error
512 ("expected timestamp not found in data source from %s",
513 argv[arg_i]);
514 free(step_start);
515 break;
516 }
517 ii = 1;
518 updvals[tmpl_idx[ii]] = stepper;
519 while (*stepper) {
520 if (*stepper == ':') {
521 *stepper = '\0';
522 ii++;
523 if (ii < tmpl_cnt) {
524 updvals[tmpl_idx[ii]] = stepper + 1;
525 }
526 }
527 stepper++;
528 }
530 if (ii != tmpl_cnt - 1) {
531 rrd_set_error
532 ("expected %lu data source readings (got %lu) from %s",
533 tmpl_cnt - 1, ii, argv[arg_i]);
534 free(step_start);
535 break;
536 }
538 /* get the time from the reading ... handle N */
539 if (timesyntax == atstyle) {
540 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
541 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
542 free(step_start);
543 break;
544 }
545 if (ds_tv.type == RELATIVE_TO_END_TIME ||
546 ds_tv.type == RELATIVE_TO_START_TIME) {
547 rrd_set_error("specifying time relative to the 'start' "
548 "or 'end' makes no sense here: %s", updvals[0]);
549 free(step_start);
550 break;
551 }
553 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
555 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
557 } else if (strcmp(updvals[0], "N") == 0) {
558 gettimeofday(&tmp_time, 0);
559 normalize_time(&tmp_time);
560 current_time = tmp_time.tv_sec;
561 current_time_usec = tmp_time.tv_usec;
562 } else {
563 double tmp;
565 tmp = strtod(updvals[0], 0);
566 current_time = floor(tmp);
567 current_time_usec =
568 (long) ((tmp - (double) current_time) * 1000000.0);
569 }
570 /* dont do any correction for old version RRDs */
571 if (version < 3)
572 current_time_usec = 0;
574 if (current_time < rrd.live_head->last_up ||
575 (current_time == rrd.live_head->last_up &&
576 (long) current_time_usec <=
577 (long) rrd.live_head->last_up_usec)) {
578 rrd_set_error("illegal attempt to update using time %ld when "
579 "last update time is %ld (minimum one second step)",
580 current_time, rrd.live_head->last_up);
581 free(step_start);
582 break;
583 }
586 /* seek to the beginning of the rra's */
587 if (rra_current != rra_begin) {
588 #ifndef HAVE_MMAP
589 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
590 rrd_set_error("seek error in rrd");
591 free(step_start);
592 break;
593 }
594 #endif
595 rra_current = rra_begin;
596 }
597 rra_start = rra_begin;
599 /* when was the current pdp started */
600 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
601 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
603 /* when did the last pdp_st occur */
604 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
605 occu_pdp_st = current_time - occu_pdp_age;
607 /* interval = current_time - rrd.live_head->last_up; */
608 interval = (double) (current_time - rrd.live_head->last_up)
609 + (double) ((long) current_time_usec -
610 (long) rrd.live_head->last_up_usec) / 1000000.0;
612 if (occu_pdp_st > proc_pdp_st) {
613 /* OK we passed the pdp_st moment */
614 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
615 * occurred before the latest
616 * pdp_st moment*/
617 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
618 post_int = occu_pdp_age; /* how much after it */
619 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
620 } else {
621 pre_int = interval;
622 post_int = 0;
623 }
625 #ifdef DEBUG
626 printf("proc_pdp_age %lu\t"
627 "proc_pdp_st %lu\t"
628 "occu_pfp_age %lu\t"
629 "occu_pdp_st %lu\t"
630 "int %lf\t"
631 "pre_int %lf\t"
632 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
633 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
634 #endif
636 /* process the data sources and update the pdp_prep
637 * area accordingly */
638 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
639 enum dst_en dst_idx;
641 dst_idx = dst_conv(rrd.ds_def[i].dst);
643 /* make sure we do not build diffs with old last_ds values */
644 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
645 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
646 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
647 }
649 /* NOTE: DST_CDEF should never enter this if block, because
650 * updvals[i+1][0] is initialized to 'U'; unless the caller
651 * accidently specified a value for the DST_CDEF. To handle
652 * this case, an extra check is required. */
654 if ((updvals[i + 1][0] != 'U') &&
655 (dst_idx != DST_CDEF) &&
656 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
657 double rate = DNAN;
659 /* the data source type defines how to process the data */
660 /* pdp_new contains rate * time ... eg the bytes
661 * transferred during the interval. Doing it this way saves
662 * a lot of math operations */
665 switch (dst_idx) {
666 case DST_COUNTER:
667 case DST_DERIVE:
668 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
669 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
670 if ((updvals[i + 1][ii] < '0'
671 || updvals[i + 1][ii] > '9') && (ii != 0
672 && updvals[i
673 +
674 1]
675 [ii] !=
676 '-')) {
677 rrd_set_error("not a simple integer: '%s'",
678 updvals[i + 1]);
679 break;
680 }
681 }
682 if (rrd_test_error()) {
683 break;
684 }
685 pdp_new[i] =
686 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
687 if (dst_idx == DST_COUNTER) {
688 /* simple overflow catcher suggested by Andres Kroonmaa */
689 /* this will fail terribly for non 32 or 64 bit counters ... */
690 /* are there any others in SNMP land ? */
691 if (pdp_new[i] < (double) 0.0)
692 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
693 if (pdp_new[i] < (double) 0.0)
694 pdp_new[i] += (double) 18446744069414584320.0;
695 /* 2^64-2^32 */ ;
696 }
697 rate = pdp_new[i] / interval;
698 } else {
699 pdp_new[i] = DNAN;
700 }
701 break;
702 case DST_ABSOLUTE:
703 errno = 0;
704 pdp_new[i] = strtod(updvals[i + 1], &endptr);
705 if (errno > 0) {
706 rrd_set_error("converting '%s' to float: %s",
707 updvals[i + 1], rrd_strerror(errno));
708 break;
709 };
710 if (endptr[0] != '\0') {
711 rrd_set_error
712 ("conversion of '%s' to float not complete: tail '%s'",
713 updvals[i + 1], endptr);
714 break;
715 }
716 rate = pdp_new[i] / interval;
717 break;
718 case DST_GAUGE:
719 errno = 0;
720 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
721 if (errno > 0) {
722 rrd_set_error("converting '%s' to float: %s",
723 updvals[i + 1], rrd_strerror(errno));
724 break;
725 };
726 if (endptr[0] != '\0') {
727 rrd_set_error
728 ("conversion of '%s' to float not complete: tail '%s'",
729 updvals[i + 1], endptr);
730 break;
731 }
732 rate = pdp_new[i] / interval;
733 break;
734 default:
735 rrd_set_error("rrd contains unknown DS type : '%s'",
736 rrd.ds_def[i].dst);
737 break;
738 }
739 /* break out of this for loop if the error string is set */
740 if (rrd_test_error()) {
741 break;
742 }
743 /* make sure pdp_temp is neither too large or too small
744 * if any of these occur it becomes unknown ...
745 * sorry folks ... */
746 if (!isnan(rate) &&
747 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
748 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
749 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
750 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
751 pdp_new[i] = DNAN;
752 }
753 } else {
754 /* no news is news all the same */
755 pdp_new[i] = DNAN;
756 }
759 /* make a copy of the command line argument for the next run */
760 #ifdef DEBUG
761 fprintf(stderr,
762 "prep ds[%lu]\t"
763 "last_arg '%s'\t"
764 "this_arg '%s'\t"
765 "pdp_new %10.2f\n",
766 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
767 #endif
768 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
769 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
770 }
771 /* break out of the argument parsing loop if the error_string is set */
772 if (rrd_test_error()) {
773 free(step_start);
774 break;
775 }
776 /* has a pdp_st moment occurred since the last run ? */
778 if (proc_pdp_st == occu_pdp_st) {
779 /* no we have not passed a pdp_st moment. therefore update is simple */
781 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
782 if (isnan(pdp_new[i])) {
783 /* this is not realy accurate if we use subsecond data arival time
784 should have thought of it when going subsecond resolution ...
785 sorry next format change we will have it! */
786 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
787 floor(interval);
788 } else {
789 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
790 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
791 } else {
792 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
793 }
794 }
795 #ifdef DEBUG
796 fprintf(stderr,
797 "NO PDP ds[%lu]\t"
798 "value %10.2f\t"
799 "unkn_sec %5lu\n",
800 i,
801 rrd.pdp_prep[i].scratch[PDP_val].u_val,
802 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
803 #endif
804 }
805 } else {
806 /* an pdp_st has occurred. */
808 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
809 * occurred up to the last run.
810 pdp_new[] contains rate*seconds from the latest run.
811 pdp_temp[] will contain the rate for cdp */
813 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
814 /* update pdp_prep to the current pdp_st. */
815 double pre_unknown = 0.0;
817 if (isnan(pdp_new[i]))
818 /* a final bit of unkonwn to be added bevore calculation
819 * we use a tempaorary variable for this so that we
820 * don't have to turn integer lines before using the value */
821 pre_unknown = pre_int;
822 else {
823 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
824 rrd.pdp_prep[i].scratch[PDP_val].u_val =
825 pdp_new[i] / interval * pre_int;
826 } else {
827 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
828 pdp_new[i] / interval * pre_int;
829 }
830 }
833 /* if too much of the pdp_prep is unknown we dump it */
834 if (
835 /* removed because this does not agree with the definition
836 a heart beat can be unknown */
837 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
838 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
839 /* if the interval is larger thatn mrhb we get NAN */
840 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
841 (occu_pdp_st - proc_pdp_st <=
842 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
843 pdp_temp[i] = DNAN;
844 } else {
845 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
846 / ((double) (occu_pdp_st - proc_pdp_st
847 -
848 rrd.pdp_prep[i].
849 scratch[PDP_unkn_sec_cnt].u_cnt)
850 - pre_unknown);
851 }
853 /* process CDEF data sources; remember each CDEF DS can
854 * only reference other DS with a lower index number */
855 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
856 rpnp_t *rpnp;
858 rpnp =
859 rpn_expand((rpn_cdefds_t *) &
860 (rrd.ds_def[i].par[DS_cdef]));
861 /* substitue data values for OP_VARIABLE nodes */
862 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
863 if (rpnp[ii].op == OP_VARIABLE) {
864 rpnp[ii].op = OP_NUMBER;
865 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
866 }
867 }
868 /* run the rpn calculator */
869 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
870 free(rpnp);
871 break; /* exits the data sources pdp_temp loop */
872 }
873 }
875 /* make pdp_prep ready for the next run */
876 if (isnan(pdp_new[i])) {
877 /* this is not realy accurate if we use subsecond data arival time
878 should have thought of it when going subsecond resolution ...
879 sorry next format change we will have it! */
880 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
881 floor(post_int);
882 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
883 } else {
884 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
885 rrd.pdp_prep[i].scratch[PDP_val].u_val =
886 pdp_new[i] / interval * post_int;
887 }
889 #ifdef DEBUG
890 fprintf(stderr,
891 "PDP UPD ds[%lu]\t"
892 "pdp_temp %10.2f\t"
893 "new_prep %10.2f\t"
894 "new_unkn_sec %5lu\n",
895 i, pdp_temp[i],
896 rrd.pdp_prep[i].scratch[PDP_val].u_val,
897 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
898 #endif
899 }
901 /* if there were errors during the last loop, bail out here */
902 if (rrd_test_error()) {
903 free(step_start);
904 break;
905 }
907 /* compute the number of elapsed pdp_st moments */
908 elapsed_pdp_st =
909 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
910 #ifdef DEBUG
911 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
912 #endif
913 if (rra_step_cnt == NULL) {
914 rra_step_cnt = (unsigned long *)
915 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
916 }
918 for (i = 0, rra_start = rra_begin;
919 i < rrd.stat_head->rra_cnt;
920 rra_start +=
921 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
922 sizeof(rrd_value_t), i++) {
923 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
924 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
925 (proc_pdp_st / rrd.stat_head->pdp_step) %
926 rrd.rra_def[i].pdp_cnt;
927 if (start_pdp_offset <= elapsed_pdp_st) {
928 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
929 rrd.rra_def[i].pdp_cnt + 1;
930 } else {
931 rra_step_cnt[i] = 0;
932 }
934 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
935 /* If this is a bulk update, we need to skip ahead in the seasonal
936 * arrays so that they will be correct for the next observed value;
937 * note that for the bulk update itself, no update will occur to
938 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
939 * be set to DNAN. */
940 if (rra_step_cnt[i] > 2) {
941 /* skip update by resetting rra_step_cnt[i],
942 * note that this is not data source specific; this is due
943 * to the bulk update, not a DNAN value for the specific data
944 * source. */
945 rra_step_cnt[i] = 0;
946 lookup_seasonal(&rrd, i, rra_start, rrd_file,
947 elapsed_pdp_st, &last_seasonal_coef);
948 lookup_seasonal(&rrd, i, rra_start, rrd_file,
949 elapsed_pdp_st + 1, &seasonal_coef);
950 }
952 /* periodically run a smoother for seasonal effects */
953 /* Need to use first cdp parameter buffer to track
954 * burnin (burnin requires a specific smoothing schedule).
955 * The CDP_init_seasonal parameter is really an RRA level,
956 * not a data source within RRA level parameter, but the rra_def
957 * is read only for rrd_update (not flushed to disk). */
958 iii = i * (rrd.stat_head->ds_cnt);
959 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
960 <= BURNIN_CYCLES) {
961 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
962 > rrd.rra_def[i].row_cnt - 1) {
963 /* mark off one of the burnin cycles */
964 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
965 u_cnt);
966 schedule_smooth = 1;
967 }
968 } else {
969 /* someone has no doubt invented a trick to deal with this
970 * wrap around, but at least this code is clear. */
971 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
972 u_cnt > rrd.rra_ptr[i].cur_row) {
973 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
974 * mapping between PDP and CDP */
975 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
976 >=
977 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
978 u_cnt) {
979 #ifdef DEBUG
980 fprintf(stderr,
981 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
982 rrd.rra_ptr[i].cur_row,
983 elapsed_pdp_st,
984 rrd.rra_def[i].
985 par[RRA_seasonal_smooth_idx].u_cnt);
986 #endif
987 schedule_smooth = 1;
988 }
989 } else {
990 /* can't rely on negative numbers because we are working with
991 * unsigned values */
992 /* Don't need modulus here. If we've wrapped more than once, only
993 * one smooth is executed at the end. */
994 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
995 rrd.rra_def[i].row_cnt
996 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
997 rrd.rra_def[i].row_cnt >=
998 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
999 u_cnt) {
1000 #ifdef DEBUG
1001 fprintf(stderr,
1002 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1003 rrd.rra_ptr[i].cur_row,
1004 elapsed_pdp_st,
1005 rrd.rra_def[i].
1006 par[RRA_seasonal_smooth_idx].u_cnt);
1007 #endif
1008 schedule_smooth = 1;
1009 }
1010 }
1011 }
1013 rra_current = rrd_tell(rrd_file);
1014 }
1015 /* if cf is DEVSEASONAL or SEASONAL */
1016 if (rrd_test_error())
1017 break;
1019 /* update CDP_PREP areas */
1020 /* loop over data soures within each RRA */
1021 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1023 /* iii indexes the CDP prep area for this data source within the RRA */
1024 iii = i * rrd.stat_head->ds_cnt + ii;
1026 if (rrd.rra_def[i].pdp_cnt > 1) {
1028 if (rra_step_cnt[i] > 0) {
1029 /* If we are in this block, as least 1 CDP value will be written to
1030 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1031 * to be written, then the "fill in" value is the CDP_secondary_val
1032 * entry. */
1033 if (isnan(pdp_temp[ii])) {
1034 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1035 u_cnt += start_pdp_offset;
1036 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1037 u_val = DNAN;
1038 } else {
1039 /* CDP_secondary value is the RRA "fill in" value for intermediary
1040 * CDP data entries. No matter the CF, the value is the same because
1041 * the average, max, min, and last of a list of identical values is
1042 * the same, namely, the value itself. */
1043 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1044 u_val = pdp_temp[ii];
1045 }
1047 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1048 u_cnt >
1049 rrd.rra_def[i].pdp_cnt *
1050 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1051 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1052 u_val = DNAN;
1053 /* initialize carry over */
1054 if (current_cf == CF_AVERAGE) {
1055 if (isnan(pdp_temp[ii])) {
1056 rrd.cdp_prep[iii].scratch[CDP_val].
1057 u_val = DNAN;
1058 } else {
1059 rrd.cdp_prep[iii].scratch[CDP_val].
1060 u_val =
1061 pdp_temp[ii] *
1062 ((elapsed_pdp_st -
1063 start_pdp_offset) %
1064 rrd.rra_def[i].pdp_cnt);
1065 }
1066 } else {
1067 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1068 pdp_temp[ii];
1069 }
1070 } else {
1071 rrd_value_t cum_val, cur_val;
1073 switch (current_cf) {
1074 case CF_AVERAGE:
1075 cum_val =
1076 IFDNAN(rrd.cdp_prep[iii].
1077 scratch[CDP_val].u_val, 0.0);
1078 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1079 rrd.cdp_prep[iii].
1080 scratch[CDP_primary_val].u_val =
1081 (cum_val +
1082 cur_val * start_pdp_offset) /
1083 (rrd.rra_def[i].pdp_cnt -
1084 rrd.cdp_prep[iii].
1085 scratch[CDP_unkn_pdp_cnt].u_cnt);
1086 /* initialize carry over value */
1087 if (isnan(pdp_temp[ii])) {
1088 rrd.cdp_prep[iii].scratch[CDP_val].
1089 u_val = DNAN;
1090 } else {
1091 rrd.cdp_prep[iii].scratch[CDP_val].
1092 u_val =
1093 pdp_temp[ii] *
1094 ((elapsed_pdp_st -
1095 start_pdp_offset) %
1096 rrd.rra_def[i].pdp_cnt);
1097 }
1098 break;
1099 case CF_MAXIMUM:
1100 cum_val =
1101 IFDNAN(rrd.cdp_prep[iii].
1102 scratch[CDP_val].u_val, -DINF);
1103 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1104 #ifdef DEBUG
1105 if (isnan
1106 (rrd.cdp_prep[iii].scratch[CDP_val].
1107 u_val) && isnan(pdp_temp[ii])) {
1108 fprintf(stderr,
1109 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1110 i, ii);
1111 exit(-1);
1112 }
1113 #endif
1114 if (cur_val > cum_val)
1115 rrd.cdp_prep[iii].
1116 scratch[CDP_primary_val].u_val =
1117 cur_val;
1118 else
1119 rrd.cdp_prep[iii].
1120 scratch[CDP_primary_val].u_val =
1121 cum_val;
1122 /* initialize carry over value */
1123 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1124 pdp_temp[ii];
1125 break;
1126 case CF_MINIMUM:
1127 cum_val =
1128 IFDNAN(rrd.cdp_prep[iii].
1129 scratch[CDP_val].u_val, DINF);
1130 cur_val = IFDNAN(pdp_temp[ii], DINF);
1131 #ifdef DEBUG
1132 if (isnan
1133 (rrd.cdp_prep[iii].scratch[CDP_val].
1134 u_val) && isnan(pdp_temp[ii])) {
1135 fprintf(stderr,
1136 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1137 i, ii);
1138 exit(-1);
1139 }
1140 #endif
1141 if (cur_val < cum_val)
1142 rrd.cdp_prep[iii].
1143 scratch[CDP_primary_val].u_val =
1144 cur_val;
1145 else
1146 rrd.cdp_prep[iii].
1147 scratch[CDP_primary_val].u_val =
1148 cum_val;
1149 /* initialize carry over value */
1150 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1151 pdp_temp[ii];
1152 break;
1153 case CF_LAST:
1154 default:
1155 rrd.cdp_prep[iii].
1156 scratch[CDP_primary_val].u_val =
1157 pdp_temp[ii];
1158 /* initialize carry over value */
1159 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1160 pdp_temp[ii];
1161 break;
1162 }
1163 } /* endif meets xff value requirement for a valid value */
1164 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1165 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1166 if (isnan(pdp_temp[ii]))
1167 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1168 u_cnt =
1169 (elapsed_pdp_st -
1170 start_pdp_offset) %
1171 rrd.rra_def[i].pdp_cnt;
1172 else
1173 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1174 u_cnt = 0;
1175 } else { /* rra_step_cnt[i] == 0 */
1177 #ifdef DEBUG
1178 if (isnan
1179 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1180 fprintf(stderr,
1181 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1182 i, ii);
1183 } else {
1184 fprintf(stderr,
1185 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1186 i, ii,
1187 rrd.cdp_prep[iii].scratch[CDP_val].
1188 u_val);
1189 }
1190 #endif
1191 if (isnan(pdp_temp[ii])) {
1192 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1193 u_cnt += elapsed_pdp_st;
1194 } else
1195 if (isnan
1196 (rrd.cdp_prep[iii].scratch[CDP_val].
1197 u_val)) {
1198 if (current_cf == CF_AVERAGE) {
1199 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1200 pdp_temp[ii] * elapsed_pdp_st;
1201 } else {
1202 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1203 pdp_temp[ii];
1204 }
1205 #ifdef DEBUG
1206 fprintf(stderr,
1207 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1208 i, ii,
1209 rrd.cdp_prep[iii].scratch[CDP_val].
1210 u_val);
1211 #endif
1212 } else {
1213 switch (current_cf) {
1214 case CF_AVERAGE:
1215 rrd.cdp_prep[iii].scratch[CDP_val].
1216 u_val +=
1217 pdp_temp[ii] * elapsed_pdp_st;
1218 break;
1219 case CF_MINIMUM:
1220 if (pdp_temp[ii] <
1221 rrd.cdp_prep[iii].scratch[CDP_val].
1222 u_val)
1223 rrd.cdp_prep[iii].scratch[CDP_val].
1224 u_val = pdp_temp[ii];
1225 break;
1226 case CF_MAXIMUM:
1227 if (pdp_temp[ii] >
1228 rrd.cdp_prep[iii].scratch[CDP_val].
1229 u_val)
1230 rrd.cdp_prep[iii].scratch[CDP_val].
1231 u_val = pdp_temp[ii];
1232 break;
1233 case CF_LAST:
1234 default:
1235 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1236 pdp_temp[ii];
1237 break;
1238 }
1239 }
1240 }
1241 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1242 if (elapsed_pdp_st > 2) {
1243 switch (current_cf) {
1244 case CF_AVERAGE:
1245 default:
1246 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1247 u_val = pdp_temp[ii];
1248 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1249 u_val = pdp_temp[ii];
1250 break;
1251 case CF_SEASONAL:
1252 case CF_DEVSEASONAL:
1253 /* need to update cached seasonal values, so they are consistent
1254 * with the bulk update */
1255 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1256 * CDP_last_deviation are the same. */
1257 rrd.cdp_prep[iii].
1258 scratch[CDP_hw_last_seasonal].u_val =
1259 last_seasonal_coef[ii];
1260 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1261 u_val = seasonal_coef[ii];
1262 break;
1263 case CF_HWPREDICT:
1264 /* need to update the null_count and last_null_count.
1265 * even do this for non-DNAN pdp_temp because the
1266 * algorithm is not learning from batch updates. */
1267 rrd.cdp_prep[iii].scratch[CDP_null_count].
1268 u_cnt += elapsed_pdp_st;
1269 rrd.cdp_prep[iii].
1270 scratch[CDP_last_null_count].u_cnt +=
1271 elapsed_pdp_st - 1;
1272 /* fall through */
1273 case CF_DEVPREDICT:
1274 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1275 u_val = DNAN;
1276 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1277 u_val = DNAN;
1278 break;
1279 case CF_FAILURES:
1280 /* do not count missed bulk values as failures */
1281 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1282 u_val = 0;
1283 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1284 u_val = 0;
1285 /* need to reset violations buffer.
1286 * could do this more carefully, but for now, just
1287 * assume a bulk update wipes away all violations. */
1288 erase_violations(&rrd, iii, i);
1289 break;
1290 }
1291 }
1292 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1294 if (rrd_test_error())
1295 break;
1297 } /* endif data sources loop */
1298 } /* end RRA Loop */
1300 /* this loop is only entered if elapsed_pdp_st < 3 */
1301 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1302 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1303 for (i = 0, rra_start = rra_begin;
1304 i < rrd.stat_head->rra_cnt;
1305 rra_start +=
1306 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1307 sizeof(rrd_value_t), i++) {
1308 if (rrd.rra_def[i].pdp_cnt > 1)
1309 continue;
1311 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1312 if (current_cf == CF_SEASONAL
1313 || current_cf == CF_DEVSEASONAL) {
1314 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1315 elapsed_pdp_st + (scratch_idx ==
1316 CDP_primary_val ? 1
1317 : 2),
1318 &seasonal_coef);
1319 rra_current = rrd_tell(rrd_file);
1320 }
1321 if (rrd_test_error())
1322 break;
1323 /* loop over data soures within each RRA */
1324 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1325 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1326 i * (rrd.stat_head->ds_cnt) + ii,
1327 i, ii, scratch_idx, seasonal_coef);
1328 }
1329 } /* end RRA Loop */
1330 if (rrd_test_error())
1331 break;
1332 } /* end elapsed_pdp_st loop */
1334 if (rrd_test_error())
1335 break;
1337 /* Ready to write to disk */
1338 /* Move sequentially through the file, writing one RRA at a time.
1339 * Note this architecture divorces the computation of CDP with
1340 * flushing updated RRA entries to disk. */
1341 for (i = 0, rra_start = rra_begin;
1342 i < rrd.stat_head->rra_cnt;
1343 rra_start +=
1344 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1345 sizeof(rrd_value_t), i++) {
1346 /* is th5Aere anything to write for this RRA? If not, continue. */
1347 if (rra_step_cnt[i] == 0)
1348 continue;
1350 /* write the first row */
1351 #ifdef DEBUG
1352 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1353 #endif
1354 rrd.rra_ptr[i].cur_row++;
1355 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1356 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1357 /* positition on the first row */
1358 rra_pos_tmp = rra_start +
1359 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1360 sizeof(rrd_value_t);
1361 if (rra_pos_tmp != rra_current) {
1362 #ifndef HAVE_MMAP
1363 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1364 rrd_set_error("seek error in rrd");
1365 break;
1366 }
1367 #endif
1368 rra_current = rra_pos_tmp;
1369 }
1370 #ifdef DEBUG
1371 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1372 #endif
1373 scratch_idx = CDP_primary_val;
1374 if (pcdp_summary != NULL) {
1375 rra_time = (current_time - current_time
1376 % (rrd.rra_def[i].pdp_cnt *
1377 rrd.stat_head->pdp_step))
1378 -
1379 ((rra_step_cnt[i] -
1380 1) * rrd.rra_def[i].pdp_cnt *
1381 rrd.stat_head->pdp_step);
1382 }
1383 #ifdef HAVE_MMAP
1384 pcdp_summary =
1385 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1386 rrd_file->fd, pcdp_summary, &rra_time,
1387 rrd_file->file_start);
1388 #else
1389 pcdp_summary =
1390 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1391 rrd_file->fd, pcdp_summary, &rra_time);
1392 #endif
1393 if (rrd_test_error())
1394 break;
1396 /* write other rows of the bulk update, if any */
1397 scratch_idx = CDP_secondary_val;
1398 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1399 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1400 #ifdef DEBUG
1401 fprintf(stderr,
1402 "Wraparound for RRA %s, %lu updates left\n",
1403 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1404 #endif
1405 /* wrap */
1406 rrd.rra_ptr[i].cur_row = 0;
1407 /* seek back to beginning of current rra */
1408 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1409 rrd_set_error("seek error in rrd");
1410 break;
1411 }
1412 #ifdef DEBUG
1413 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1414 rrd_file->pos);
1415 #endif
1416 rra_current = rra_start;
1417 }
1418 if (pcdp_summary != NULL) {
1419 rra_time = (current_time - current_time
1420 % (rrd.rra_def[i].pdp_cnt *
1421 rrd.stat_head->pdp_step))
1422 -
1423 ((rra_step_cnt[i] -
1424 2) * rrd.rra_def[i].pdp_cnt *
1425 rrd.stat_head->pdp_step);
1426 }
1427 #ifdef HAVE_MMAP
1428 pcdp_summary =
1429 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1430 rrd_file->fd, pcdp_summary, &rra_time,
1431 rrd_file->file_start);
1432 #else
1433 pcdp_summary =
1434 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1435 rrd_file->fd, pcdp_summary, &rra_time);
1436 #endif
1437 }
1439 if (rrd_test_error())
1440 break;
1441 } /* RRA LOOP */
1443 /* break out of the argument parsing loop if error_string is set */
1444 if (rrd_test_error()) {
1445 free(step_start);
1446 break;
1447 }
1449 } /* endif a pdp_st has occurred */
1450 rrd.live_head->last_up = current_time;
1451 rrd.live_head->last_up_usec = current_time_usec;
1452 free(step_start);
1453 } /* function argument loop */
1455 if (seasonal_coef != NULL)
1456 free(seasonal_coef);
1457 if (last_seasonal_coef != NULL)
1458 free(last_seasonal_coef);
1459 if (rra_step_cnt != NULL)
1460 free(rra_step_cnt);
1461 rpnstack_free(&rpnstack);
1463 #ifdef HAVE_MMAP
1464 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1465 rrd_set_error("error writing(unmapping) file: %s", filename);
1466 }
1467 #endif
1468 /* if we got here and if there is an error and if the file has not been
1469 * written to, then close things up and return. */
1470 if (rrd_test_error()) {
1471 free(updvals);
1472 free(tmpl_idx);
1473 rrd_free(&rrd);
1474 free(pdp_temp);
1475 free(pdp_new);
1476 close(rrd_file->fd);
1477 return (-1);
1478 }
1480 /* aargh ... that was tough ... so many loops ... anyway, its done.
1481 * we just need to write back the live header portion now*/
1483 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1484 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1485 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1486 SEEK_SET) != 0) {
1487 rrd_set_error("seek rrd for live header writeback");
1488 free(updvals);
1489 free(tmpl_idx);
1490 rrd_free(&rrd);
1491 free(pdp_temp);
1492 free(pdp_new);
1493 close(rrd_file->fd);
1494 return (-1);
1495 }
1497 if (version >= 3) {
1498 if (rrd_write(rrd_file, rrd.live_head,
1499 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1500 rrd_set_error("rrd_write live_head to rrd");
1501 free(updvals);
1502 rrd_free(&rrd);
1503 free(tmpl_idx);
1504 free(pdp_temp);
1505 free(pdp_new);
1506 close(rrd_file->fd);
1507 return (-1);
1508 }
1509 } else {
1510 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1511 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1512 rrd_set_error("rrd_write live_head to rrd");
1513 free(updvals);
1514 rrd_free(&rrd);
1515 free(tmpl_idx);
1516 free(pdp_temp);
1517 free(pdp_new);
1518 close(rrd_file->fd);
1519 return (-1);
1520 }
1521 }
1524 if (rrd_write(rrd_file, rrd.pdp_prep,
1525 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1526 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1527 rrd_set_error("rrd_write pdp_prep to rrd");
1528 free(updvals);
1529 rrd_free(&rrd);
1530 free(tmpl_idx);
1531 free(pdp_temp);
1532 free(pdp_new);
1533 close(rrd_file->fd);
1534 return (-1);
1535 }
1537 if (rrd_write(rrd_file, rrd.cdp_prep,
1538 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1539 rrd.stat_head->ds_cnt)
1540 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1541 rrd.stat_head->ds_cnt)) {
1543 rrd_set_error("rrd_write cdp_prep to rrd");
1544 free(updvals);
1545 free(tmpl_idx);
1546 rrd_free(&rrd);
1547 free(pdp_temp);
1548 free(pdp_new);
1549 close(rrd_file->fd);
1550 return (-1);
1551 }
1553 if (rrd_write(rrd_file, rrd.rra_ptr,
1554 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1555 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1556 rrd_set_error("rrd_write rra_ptr to rrd");
1557 free(updvals);
1558 free(tmpl_idx);
1559 rrd_free(&rrd);
1560 free(pdp_temp);
1561 free(pdp_new);
1562 close(rrd_file->fd);
1563 return (-1);
1564 }
1565 #ifdef HAVE_POSIX_FADVISExxx
1567 /* with update we have write ops, so they will probably not be done by now, this means
1568 the buffers will not get freed. But calling this for the whole file - header
1569 will let the data off the hook as soon as it is written when if it is from a previous
1570 update cycle. Calling fdsync to force things is much too hard here. */
1572 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1573 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1574 rrd_strerror(errno));
1575 close(rrd_file->fd);
1576 return (-1);
1577 }
1578 #endif
1579 /*XXX: ? */ rrd_flush(rrd_file);
1581 /* calling the smoothing code here guarantees at most
1582 * one smoothing operation per rrd_update call. Unfortunately,
1583 * it is possible with bulk updates, or a long-delayed update
1584 * for smoothing to occur off-schedule. This really isn't
1585 * critical except during the burning cycles. */
1586 if (schedule_smooth) {
1587 // in_file = fopen(filename,"rb+");
1590 rra_start = rra_begin;
1591 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1592 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1593 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1594 #ifdef DEBUG
1595 fprintf(stderr, "Running smoother for rra %ld\n", i);
1596 #endif
1597 apply_smoother(&rrd, i, rra_start, rrd_file);
1598 if (rrd_test_error())
1599 break;
1600 }
1601 rra_start += rrd.rra_def[i].row_cnt
1602 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1603 }
1604 #ifdef HAVE_POSIX_FADVISExxx
1605 /* same procedure as above ... */
1606 if (0 !=
1607 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1608 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1609 rrd_strerror(errno));
1610 close(rrd_file->fd);
1611 return (-1);
1612 }
1613 #endif
1614 close(rrd_file->fd);
1615 }
1617 /* OK now close the files and free the memory */
1618 if (close(rrd_file->fd) != 0) {
1619 rrd_set_error("closing rrd");
1620 free(updvals);
1621 free(tmpl_idx);
1622 rrd_free(&rrd);
1623 free(pdp_temp);
1624 free(pdp_new);
1625 return (-1);
1626 }
1628 rrd_free(&rrd);
1629 free(updvals);
1630 free(tmpl_idx);
1631 free(pdp_new);
1632 free(pdp_temp);
1633 return (0);
1634 }
1636 /*
1637 * get exclusive lock to whole file.
1638 * lock gets removed when we close the file
1639 *
1640 * returns 0 on success
1641 */
1642 int LockRRD(
1643 int in_file)
1644 {
1645 int rcstat;
1647 {
1648 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1649 struct _stat st;
1651 if (_fstat(in_file, &st) == 0) {
1652 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1653 } else {
1654 rcstat = -1;
1655 }
1656 #else
1657 struct flock lock;
1659 lock.l_type = F_WRLCK; /* exclusive write lock */
1660 lock.l_len = 0; /* whole file */
1661 lock.l_start = 0; /* start of file */
1662 lock.l_whence = SEEK_SET; /* end of file */
1664 rcstat = fcntl(in_file, F_SETLK, &lock);
1665 #endif
1666 }
1668 return (rcstat);
1669 }
1672 #ifdef HAVE_MMAP
1673 info_t
1682 *write_RRA_row(
1683 rrd_t *rrd,
1684 unsigned long rra_idx,
1685 unsigned long *rra_current,
1686 unsigned short CDP_scratch_idx,
1687 #ifndef DEBUG
1688 int UNUSED(in_file),
1689 #else
1690 int in_file,
1691 #endif
1692 info_t *pcdp_summary,
1693 time_t *rra_time,
1694 void *rrd_mmaped_file)
1695 #else
1696 info_t
1705 *write_RRA_row(
1706 rrd_t *rrd,
1707 unsigned long rra_idx,
1708 unsigned long *rra_current,
1709 unsigned short CDP_scratch_idx,
1710 int in_file,
1711 info_t *pcdp_summary,
1712 time_t *rra_time)
1713 #endif
1714 {
1715 unsigned long ds_idx, cdp_idx;
1716 infoval iv;
1718 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1719 /* compute the cdp index */
1720 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1721 #ifdef DEBUG
1722 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1723 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1724 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1725 #endif
1726 if (pcdp_summary != NULL) {
1727 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1728 /* append info to the return hash */
1729 pcdp_summary = info_push(pcdp_summary,
1730 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1731 *rra_time,
1732 rrd->rra_def[rra_idx].
1733 cf_nam,
1734 rrd->rra_def[rra_idx].
1735 pdp_cnt,
1736 rrd->ds_def[ds_idx].
1737 ds_nam), RD_I_VAL, iv);
1738 }
1739 #ifdef HAVE_MMAP
1740 memcpy((char *) rrd_mmaped_file + *rra_current,
1741 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1742 sizeof(rrd_value_t));
1743 #else
1744 if (rrd_write
1745 (rrd_file,
1746 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1747 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
1748 rrd_set_error("writing rrd");
1749 return 0;
1750 }
1751 #endif
1752 *rra_current += sizeof(rrd_value_t);
1753 }
1754 return (pcdp_summary);
1755 }