1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 #ifdef HAVE_MMAP
13 # include <sys/mman.h>
14 #endif
16 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
17 #include <sys/locking.h>
18 #include <sys/stat.h>
19 #include <io.h>
20 #endif
22 #include "rrd_hw.h"
23 #include "rrd_rpncalc.h"
25 #include "rrd_is_thread_safe.h"
26 #include "unused.h"
28 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
29 /*
30 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
31 * replacement.
32 */
33 #include <sys/timeb.h>
35 #ifndef __MINGW32__
36 struct timeval {
37 time_t tv_sec; /* seconds */
38 long tv_usec; /* microseconds */
39 };
40 #endif
42 struct __timezone {
43 int tz_minuteswest; /* minutes W of Greenwich */
44 int tz_dsttime; /* type of dst correction */
45 };
47 static int gettimeofday(
48 struct timeval *t,
49 struct __timezone *tz)
50 {
52 struct _timeb current_time;
54 _ftime(¤t_time);
56 t->tv_sec = current_time.time;
57 t->tv_usec = current_time.millitm * 1000;
59 return 0;
60 }
62 #endif
63 /*
64 * normilize time as returned by gettimeofday. usec part must
65 * be always >= 0
66 */
67 static void normalize_time(
68 struct timeval *t)
69 {
70 if (t->tv_usec < 0) {
71 t->tv_sec--;
72 t->tv_usec += 1000000L;
73 }
74 }
76 /* Local prototypes */
77 int LockRRD(
78 int in_file);
80 #ifdef HAVE_MMAP
81 info_t *write_RRA_row(
82 rrd_t *rrd,
83 unsigned long rra_idx,
84 unsigned long *rra_current,
85 unsigned short CDP_scratch_idx,
86 #ifndef DEBUG
87 int UNUSED(in_file),
88 #else
89 int in_file,
90 #endif
91 info_t *pcdp_summary,
92 time_t *rra_time,
93 void *rrd_mmaped_file);
94 #else
95 info_t *write_RRA_row(
96 rrd_t *rrd,
97 unsigned long rra_idx,
98 unsigned long *rra_current,
99 unsigned short CDP_scratch_idx,
100 int in_file,
101 info_t *pcdp_summary,
102 time_t *rra_time);
103 #endif
104 int rrd_update_r(
105 const char *filename,
106 const char *tmplt,
107 int argc,
108 const char **argv);
109 int _rrd_update(
110 const char *filename,
111 const char *tmplt,
112 int argc,
113 const char **argv,
114 info_t *);
116 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
119 info_t *rrd_update_v(
120 int argc,
121 char **argv)
122 {
123 char *tmplt = NULL;
124 info_t *result = NULL;
125 infoval rc;
127 rc.u_int = -1;
128 optind = 0;
129 opterr = 0; /* initialize getopt */
131 while (1) {
132 static struct option long_options[] = {
133 {"template", required_argument, 0, 't'},
134 {0, 0, 0, 0}
135 };
136 int option_index = 0;
137 int opt;
139 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
141 if (opt == EOF)
142 break;
144 switch (opt) {
145 case 't':
146 tmplt = optarg;
147 break;
149 case '?':
150 rrd_set_error("unknown option '%s'", argv[optind - 1]);
151 goto end_tag;
152 }
153 }
155 /* need at least 2 arguments: filename, data. */
156 if (argc - optind < 2) {
157 rrd_set_error("Not enough arguments");
158 goto end_tag;
159 }
160 rc.u_int = 0;
161 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
162 rc.u_int = _rrd_update(argv[optind], tmplt,
163 argc - optind - 1,
164 (const char **) (argv + optind + 1), result);
165 result->value.u_int = rc.u_int;
166 end_tag:
167 return result;
168 }
170 int rrd_update(
171 int argc,
172 char **argv)
173 {
174 char *tmplt = NULL;
175 int rc;
177 optind = 0;
178 opterr = 0; /* initialize getopt */
180 while (1) {
181 static struct option long_options[] = {
182 {"template", required_argument, 0, 't'},
183 {0, 0, 0, 0}
184 };
185 int option_index = 0;
186 int opt;
188 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
190 if (opt == EOF)
191 break;
193 switch (opt) {
194 case 't':
195 tmplt = optarg;
196 break;
198 case '?':
199 rrd_set_error("unknown option '%s'", argv[optind - 1]);
200 return (-1);
201 }
202 }
204 /* need at least 2 arguments: filename, data. */
205 if (argc - optind < 2) {
206 rrd_set_error("Not enough arguments");
208 return -1;
209 }
211 rc = rrd_update_r(argv[optind], tmplt,
212 argc - optind - 1, (const char **) (argv + optind + 1));
213 return rc;
214 }
216 int rrd_update_r(
217 const char *filename,
218 const char *tmplt,
219 int argc,
220 const char **argv)
221 {
222 return _rrd_update(filename, tmplt, argc, argv, NULL);
223 }
225 int _rrd_update(
226 const char *filename,
227 const char *tmplt,
228 int argc,
229 const char **argv,
230 info_t *pcdp_summary)
231 {
233 int arg_i = 2;
234 short j;
235 unsigned long i, ii, iii = 1;
237 unsigned long rra_begin; /* byte pointer to the rra
238 * area in the rrd file. this
239 * pointer never changes value */
240 unsigned long rra_start; /* byte pointer to the rra
241 * area in the rrd file. this
242 * pointer changes as each rrd is
243 * processed. */
244 unsigned long rra_current; /* byte pointer to the current write
245 * spot in the rrd file. */
246 unsigned long rra_pos_tmp; /* temporary byte pointer. */
247 double interval, pre_int, post_int; /* interval between this and
248 * the last run */
249 unsigned long proc_pdp_st; /* which pdp_st was the last
250 * to be processed */
251 unsigned long occu_pdp_st; /* when was the pdp_st
252 * before the last update
253 * time */
254 unsigned long proc_pdp_age; /* how old was the data in
255 * the pdp prep area when it
256 * was last updated */
257 unsigned long occu_pdp_age; /* how long ago was the last
258 * pdp_step time */
259 rrd_value_t *pdp_new; /* prepare the incoming data
260 * to be added the the
261 * existing entry */
262 rrd_value_t *pdp_temp; /* prepare the pdp values
263 * to be added the the
264 * cdp values */
266 long *tmpl_idx; /* index representing the settings
267 transported by the tmplt index */
268 unsigned long tmpl_cnt = 2; /* time and data */
270 rrd_t rrd;
271 time_t current_time = 0;
272 time_t rra_time = 0; /* time of update for a RRA */
273 unsigned long current_time_usec = 0; /* microseconds part of current time */
274 struct timeval tmp_time; /* used for time conversion */
276 char **updvals;
277 int schedule_smooth = 0;
278 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
280 /* a vector of future Holt-Winters seasonal coefs */
281 unsigned long elapsed_pdp_st;
283 /* number of elapsed PDP steps since last update */
284 unsigned long *rra_step_cnt = NULL;
286 /* number of rows to be updated in an RRA for a data
287 * value. */
288 unsigned long start_pdp_offset;
290 /* number of PDP steps since the last update that
291 * are assigned to the first CDP to be generated
292 * since the last update. */
293 unsigned short scratch_idx;
295 /* index into the CDP scratch array */
296 enum cf_en current_cf;
298 /* numeric id of the current consolidation function */
299 rpnstack_t rpnstack; /* used for COMPUTE DS */
300 int version; /* rrd version */
301 char *endptr; /* used in the conversion */
302 rrd_file_t *rrd_file;
304 rpnstack_init(&rpnstack);
306 /* need at least 1 arguments: data. */
307 if (argc < 1) {
308 rrd_set_error("Not enough arguments");
309 return -1;
310 }
312 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
313 if (rrd_file == NULL) {
314 return -1;
315 }
317 /* initialize time */
318 version = atoi(rrd.stat_head->version);
319 gettimeofday(&tmp_time, 0);
320 normalize_time(&tmp_time);
321 current_time = tmp_time.tv_sec;
322 if (version >= 3) {
323 current_time_usec = tmp_time.tv_usec;
324 } else {
325 current_time_usec = 0;
326 }
328 rra_current = rra_start = rra_begin = rrd_file->header_len;
329 /* This is defined in the ANSI C standard, section 7.9.5.3:
331 When a file is opened with udpate mode ('+' as the second
332 or third character in the ... list of mode argument
333 variables), both input and output may be performed on the
334 associated stream. However, ... input may not be directly
335 followed by output without an intervening call to a file
336 positioning function, unless the input operation encounters
337 end-of-file. */
338 #if 0 //def HAVE_MMAP
339 rrd_filesize = rrd_file->file_size;
340 fseek(rrd_file->fd, 0, SEEK_END);
341 rrd_filesize = ftell(rrd_file->fd);
342 fseek(rrd_file->fd, rra_current, SEEK_SET);
343 #else
344 // fseek(rrd_file->fd, 0, SEEK_CUR);
345 #endif
348 /* get exclusive lock to whole file.
349 * lock gets removed when we close the file.
350 */
351 if (LockRRD(rrd_file->fd) != 0) {
352 rrd_set_error("could not lock RRD");
353 rrd_free(&rrd);
354 close(rrd_file->fd);
355 return (-1);
356 }
358 if ((updvals =
359 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
360 rrd_set_error("allocating updvals pointer array");
361 rrd_free(&rrd);
362 close(rrd_file->fd);
363 return (-1);
364 }
366 if ((pdp_temp = malloc(sizeof(rrd_value_t)
367 * rrd.stat_head->ds_cnt)) == NULL) {
368 rrd_set_error("allocating pdp_temp ...");
369 free(updvals);
370 rrd_free(&rrd);
371 close(rrd_file->fd);
372 return (-1);
373 }
375 if ((tmpl_idx = malloc(sizeof(unsigned long)
376 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
377 rrd_set_error("allocating tmpl_idx ...");
378 free(pdp_temp);
379 free(updvals);
380 rrd_free(&rrd);
381 close(rrd_file->fd);
382 return (-1);
383 }
384 /* initialize tmplt redirector */
385 /* default config example (assume DS 1 is a CDEF DS)
386 tmpl_idx[0] -> 0; (time)
387 tmpl_idx[1] -> 1; (DS 0)
388 tmpl_idx[2] -> 3; (DS 2)
389 tmpl_idx[3] -> 4; (DS 3) */
390 tmpl_idx[0] = 0; /* time */
391 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
392 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
393 tmpl_idx[ii++] = i;
394 }
395 tmpl_cnt = ii;
397 if (tmplt) {
398 /* we should work on a writeable copy here */
399 char *dsname;
400 unsigned int tmpl_len;
401 char *tmplt_copy = strdup(tmplt);
403 dsname = tmplt_copy;
404 tmpl_cnt = 1; /* the first entry is the time */
405 tmpl_len = strlen(tmplt_copy);
406 for (i = 0; i <= tmpl_len; i++) {
407 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
408 tmplt_copy[i] = '\0';
409 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
410 rrd_set_error
411 ("tmplt contains more DS definitions than RRD");
412 free(updvals);
413 free(pdp_temp);
414 free(tmpl_idx);
415 rrd_free(&rrd);
416 close(rrd_file->fd);
417 return (-1);
418 }
419 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
420 rrd_set_error("unknown DS name '%s'", dsname);
421 free(updvals);
422 free(pdp_temp);
423 free(tmplt_copy);
424 free(tmpl_idx);
425 rrd_free(&rrd);
426 close(rrd_file->fd);
427 return (-1);
428 } else {
429 /* the first element is always the time */
430 tmpl_idx[tmpl_cnt - 1]++;
431 /* go to the next entry on the tmplt_copy */
432 dsname = &tmplt_copy[i + 1];
433 /* fix the damage we did before */
434 if (i < tmpl_len) {
435 tmplt_copy[i] = ':';
436 }
438 }
439 }
440 }
441 free(tmplt_copy);
442 }
443 if ((pdp_new = malloc(sizeof(rrd_value_t)
444 * rrd.stat_head->ds_cnt)) == NULL) {
445 rrd_set_error("allocating pdp_new ...");
446 free(updvals);
447 free(pdp_temp);
448 free(tmpl_idx);
449 rrd_free(&rrd);
450 close(rrd_file->fd);
451 return (-1);
452 }
453 #if 0 //def HAVE_MMAP
454 rrd_mmaped_file = mmap(0,
455 rrd_file->file_len,
456 PROT_READ | PROT_WRITE,
457 MAP_SHARED, fileno(in_file), 0);
458 if (rrd_mmaped_file == MAP_FAILED) {
459 rrd_set_error("error mmapping file %s", filename);
460 free(updvals);
461 free(pdp_temp);
462 free(tmpl_idx);
463 rrd_free(&rrd);
464 close(rrd_file->fd);
465 return (-1);
466 }
467 #ifdef USE_MADVISE
468 /* when we use mmaping we tell the kernel the mmap equivalent
469 of POSIX_FADV_RANDOM */
470 madvise(rrd_mmaped_file, rrd_filesize, POSIX_MADV_RANDOM);
471 #endif
472 #endif
473 /* loop through the arguments. */
474 for (arg_i = 0; arg_i < argc; arg_i++) {
475 char *stepper = strdup(argv[arg_i]);
476 char *step_start = stepper;
477 char *p;
478 char *parsetime_error = NULL;
479 enum { atstyle, normal } timesyntax;
480 struct rrd_time_value ds_tv;
482 if (stepper == NULL) {
483 rrd_set_error("failed duplication argv entry");
484 free(step_start);
485 free(updvals);
486 free(pdp_temp);
487 free(tmpl_idx);
488 rrd_free(&rrd);
489 #ifdef HAVE_MMAP
490 rrd_close(rrd_file);
491 #endif
492 close(rrd_file->fd);
493 return (-1);
494 }
495 /* initialize all ds input to unknown except the first one
496 which has always got to be set */
497 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
498 updvals[ii] = "U";
499 updvals[0] = stepper;
500 /* separate all ds elements; first must be examined separately
501 due to alternate time syntax */
502 if ((p = strchr(stepper, '@')) != NULL) {
503 timesyntax = atstyle;
504 *p = '\0';
505 stepper = p + 1;
506 } else if ((p = strchr(stepper, ':')) != NULL) {
507 timesyntax = normal;
508 *p = '\0';
509 stepper = p + 1;
510 } else {
511 rrd_set_error
512 ("expected timestamp not found in data source from %s",
513 argv[arg_i]);
514 free(step_start);
515 break;
516 }
517 ii = 1;
518 updvals[tmpl_idx[ii]] = stepper;
519 while (*stepper) {
520 if (*stepper == ':') {
521 *stepper = '\0';
522 ii++;
523 if (ii < tmpl_cnt) {
524 updvals[tmpl_idx[ii]] = stepper + 1;
525 }
526 }
527 stepper++;
528 }
530 if (ii != tmpl_cnt - 1) {
531 rrd_set_error
532 ("expected %lu data source readings (got %lu) from %s",
533 tmpl_cnt - 1, ii, argv[arg_i]);
534 free(step_start);
535 break;
536 }
538 /* get the time from the reading ... handle N */
539 if (timesyntax == atstyle) {
540 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
541 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
542 free(step_start);
543 break;
544 }
545 if (ds_tv.type == RELATIVE_TO_END_TIME ||
546 ds_tv.type == RELATIVE_TO_START_TIME) {
547 rrd_set_error("specifying time relative to the 'start' "
548 "or 'end' makes no sense here: %s", updvals[0]);
549 free(step_start);
550 break;
551 }
553 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
554 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
556 } else if (strcmp(updvals[0], "N") == 0) {
557 gettimeofday(&tmp_time, 0);
558 normalize_time(&tmp_time);
559 current_time = tmp_time.tv_sec;
560 current_time_usec = tmp_time.tv_usec;
561 } else {
562 double tmp;
564 tmp = strtod(updvals[0], 0);
565 current_time = floor(tmp);
566 current_time_usec =
567 (long) ((tmp - (double) current_time) * 1000000.0);
568 }
569 /* dont do any correction for old version RRDs */
570 if (version < 3)
571 current_time_usec = 0;
573 if (current_time < rrd.live_head->last_up ||
574 (current_time == rrd.live_head->last_up &&
575 (long) current_time_usec <=
576 (long) rrd.live_head->last_up_usec)) {
577 rrd_set_error("illegal attempt to update using time %ld when "
578 "last update time is %ld (minimum one second step)",
579 current_time, rrd.live_head->last_up);
580 free(step_start);
581 break;
582 }
585 /* seek to the beginning of the rra's */
586 if (rra_current != rra_begin) {
587 #ifndef HAVE_MMAP
588 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
589 rrd_set_error("seek error in rrd");
590 free(step_start);
591 break;
592 }
593 #endif
594 rra_current = rra_begin;
595 }
596 rra_start = rra_begin;
598 /* when was the current pdp started */
599 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
600 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
602 /* when did the last pdp_st occur */
603 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
604 occu_pdp_st = current_time - occu_pdp_age;
606 /* interval = current_time - rrd.live_head->last_up; */
607 interval = (double) (current_time - rrd.live_head->last_up)
608 + (double) ((long) current_time_usec -
609 (long) rrd.live_head->last_up_usec) / 1000000.0;
611 if (occu_pdp_st > proc_pdp_st) {
612 /* OK we passed the pdp_st moment */
613 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
614 * occurred before the latest
615 * pdp_st moment*/
616 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
617 post_int = occu_pdp_age; /* how much after it */
618 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
619 } else {
620 pre_int = interval;
621 post_int = 0;
622 }
624 #ifdef DEBUG
625 printf("proc_pdp_age %lu\t"
626 "proc_pdp_st %lu\t"
627 "occu_pfp_age %lu\t"
628 "occu_pdp_st %lu\t"
629 "int %lf\t"
630 "pre_int %lf\t"
631 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
632 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
633 #endif
635 /* process the data sources and update the pdp_prep
636 * area accordingly */
637 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
638 enum dst_en dst_idx;
640 dst_idx = dst_conv(rrd.ds_def[i].dst);
642 /* make sure we do not build diffs with old last_ds values */
643 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
644 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
645 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
646 }
648 /* NOTE: DST_CDEF should never enter this if block, because
649 * updvals[i+1][0] is initialized to 'U'; unless the caller
650 * accidently specified a value for the DST_CDEF. To handle
651 * this case, an extra check is required. */
653 if ((updvals[i + 1][0] != 'U') &&
654 (dst_idx != DST_CDEF) &&
655 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
656 double rate = DNAN;
658 /* the data source type defines how to process the data */
659 /* pdp_new contains rate * time ... eg the bytes
660 * transferred during the interval. Doing it this way saves
661 * a lot of math operations */
664 switch (dst_idx) {
665 case DST_COUNTER:
666 case DST_DERIVE:
667 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
668 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
669 if ((updvals[i + 1][ii] < '0'
670 || updvals[i + 1][ii] > '9') && (ii != 0
671 && updvals[i
672 +
673 1]
674 [ii] !=
675 '-')) {
676 rrd_set_error("not a simple integer: '%s'",
677 updvals[i + 1]);
678 break;
679 }
680 }
681 if (rrd_test_error()) {
682 break;
683 }
684 pdp_new[i] =
685 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
686 if (dst_idx == DST_COUNTER) {
687 /* simple overflow catcher suggested by Andres Kroonmaa */
688 /* this will fail terribly for non 32 or 64 bit counters ... */
689 /* are there any others in SNMP land ? */
690 if (pdp_new[i] < (double) 0.0)
691 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
692 if (pdp_new[i] < (double) 0.0)
693 pdp_new[i] += (double) 18446744069414584320.0;
694 /* 2^64-2^32 */ ;
695 }
696 rate = pdp_new[i] / interval;
697 } else {
698 pdp_new[i] = DNAN;
699 }
700 break;
701 case DST_ABSOLUTE:
702 errno = 0;
703 pdp_new[i] = strtod(updvals[i + 1], &endptr);
704 if (errno > 0) {
705 rrd_set_error("converting '%s' to float: %s",
706 updvals[i + 1], rrd_strerror(errno));
707 break;
708 };
709 if (endptr[0] != '\0') {
710 rrd_set_error
711 ("conversion of '%s' to float not complete: tail '%s'",
712 updvals[i + 1], endptr);
713 break;
714 }
715 rate = pdp_new[i] / interval;
716 break;
717 case DST_GAUGE:
718 errno = 0;
719 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
720 if (errno > 0) {
721 rrd_set_error("converting '%s' to float: %s",
722 updvals[i + 1], rrd_strerror(errno));
723 break;
724 };
725 if (endptr[0] != '\0') {
726 rrd_set_error
727 ("conversion of '%s' to float not complete: tail '%s'",
728 updvals[i + 1], endptr);
729 break;
730 }
731 rate = pdp_new[i] / interval;
732 break;
733 default:
734 rrd_set_error("rrd contains unknown DS type : '%s'",
735 rrd.ds_def[i].dst);
736 break;
737 }
738 /* break out of this for loop if the error string is set */
739 if (rrd_test_error()) {
740 break;
741 }
742 /* make sure pdp_temp is neither too large or too small
743 * if any of these occur it becomes unknown ...
744 * sorry folks ... */
745 if (!isnan(rate) &&
746 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
747 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
748 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
749 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
750 pdp_new[i] = DNAN;
751 }
752 } else {
753 /* no news is news all the same */
754 pdp_new[i] = DNAN;
755 }
758 /* make a copy of the command line argument for the next run */
759 #ifdef DEBUG
760 fprintf(stderr,
761 "prep ds[%lu]\t"
762 "last_arg '%s'\t"
763 "this_arg '%s'\t"
764 "pdp_new %10.2f\n",
765 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
766 #endif
767 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
768 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
769 }
770 /* break out of the argument parsing loop if the error_string is set */
771 if (rrd_test_error()) {
772 free(step_start);
773 break;
774 }
775 /* has a pdp_st moment occurred since the last run ? */
777 if (proc_pdp_st == occu_pdp_st) {
778 /* no we have not passed a pdp_st moment. therefore update is simple */
780 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
781 if (isnan(pdp_new[i])) {
782 /* this is not realy accurate if we use subsecond data arival time
783 should have thought of it when going subsecond resolution ...
784 sorry next format change we will have it! */
785 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
786 floor(interval);
787 } else {
788 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
789 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
790 } else {
791 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
792 }
793 }
794 #ifdef DEBUG
795 fprintf(stderr,
796 "NO PDP ds[%lu]\t"
797 "value %10.2f\t"
798 "unkn_sec %5lu\n",
799 i,
800 rrd.pdp_prep[i].scratch[PDP_val].u_val,
801 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
802 #endif
803 }
804 } else {
805 /* an pdp_st has occurred. */
807 /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
808 * occurred up to the last run.
809 pdp_new[] contains rate*seconds from the latest run.
810 pdp_temp[] will contain the rate for cdp */
812 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
813 /* update pdp_prep to the current pdp_st. */
814 double pre_unknown = 0.0;
816 if (isnan(pdp_new[i]))
817 /* a final bit of unkonwn to be added bevore calculation
818 * we use a tempaorary variable for this so that we
819 * don't have to turn integer lines before using the value */
820 pre_unknown = pre_int;
821 else {
822 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
823 rrd.pdp_prep[i].scratch[PDP_val].u_val =
824 pdp_new[i] / interval * pre_int;
825 } else {
826 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
827 pdp_new[i] / interval * pre_int;
828 }
829 }
832 /* if too much of the pdp_prep is unknown we dump it */
833 if (
834 /* removed because this does not agree with the definition
835 a heart beat can be unknown */
836 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
837 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
838 /* if the interval is larger thatn mrhb we get NAN */
839 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
840 (occu_pdp_st - proc_pdp_st <=
841 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
842 pdp_temp[i] = DNAN;
843 } else {
844 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
845 / ((double) (occu_pdp_st - proc_pdp_st
846 -
847 rrd.pdp_prep[i].
848 scratch[PDP_unkn_sec_cnt].u_cnt)
849 - pre_unknown);
850 }
852 /* process CDEF data sources; remember each CDEF DS can
853 * only reference other DS with a lower index number */
854 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
855 rpnp_t *rpnp;
857 rpnp =
858 rpn_expand((rpn_cdefds_t *) &
859 (rrd.ds_def[i].par[DS_cdef]));
860 /* substitue data values for OP_VARIABLE nodes */
861 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
862 if (rpnp[ii].op == OP_VARIABLE) {
863 rpnp[ii].op = OP_NUMBER;
864 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
865 }
866 }
867 /* run the rpn calculator */
868 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
869 free(rpnp);
870 break; /* exits the data sources pdp_temp loop */
871 }
872 }
874 /* make pdp_prep ready for the next run */
875 if (isnan(pdp_new[i])) {
876 /* this is not realy accurate if we use subsecond data arival time
877 should have thought of it when going subsecond resolution ...
878 sorry next format change we will have it! */
879 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
880 floor(post_int);
881 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
882 } else {
883 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
884 rrd.pdp_prep[i].scratch[PDP_val].u_val =
885 pdp_new[i] / interval * post_int;
886 }
888 #ifdef DEBUG
889 fprintf(stderr,
890 "PDP UPD ds[%lu]\t"
891 "pdp_temp %10.2f\t"
892 "new_prep %10.2f\t"
893 "new_unkn_sec %5lu\n",
894 i, pdp_temp[i],
895 rrd.pdp_prep[i].scratch[PDP_val].u_val,
896 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
897 #endif
898 }
900 /* if there were errors during the last loop, bail out here */
901 if (rrd_test_error()) {
902 free(step_start);
903 break;
904 }
906 /* compute the number of elapsed pdp_st moments */
907 elapsed_pdp_st =
908 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
909 #ifdef DEBUG
910 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
911 #endif
912 if (rra_step_cnt == NULL) {
913 rra_step_cnt = (unsigned long *)
914 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
915 }
917 for (i = 0, rra_start = rra_begin;
918 i < rrd.stat_head->rra_cnt;
919 rra_start +=
920 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
921 sizeof(rrd_value_t), i++) {
922 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
923 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
924 (proc_pdp_st / rrd.stat_head->pdp_step) %
925 rrd.rra_def[i].pdp_cnt;
926 if (start_pdp_offset <= elapsed_pdp_st) {
927 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
928 rrd.rra_def[i].pdp_cnt + 1;
929 } else {
930 rra_step_cnt[i] = 0;
931 }
933 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
934 /* If this is a bulk update, we need to skip ahead in the seasonal
935 * arrays so that they will be correct for the next observed value;
936 * note that for the bulk update itself, no update will occur to
937 * DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
938 * be set to DNAN. */
939 if (rra_step_cnt[i] > 2) {
940 /* skip update by resetting rra_step_cnt[i],
941 * note that this is not data source specific; this is due
942 * to the bulk update, not a DNAN value for the specific data
943 * source. */
944 rra_step_cnt[i] = 0;
945 lookup_seasonal(&rrd, i, rra_start, rrd_file,
946 elapsed_pdp_st, &last_seasonal_coef);
947 lookup_seasonal(&rrd, i, rra_start, rrd_file,
948 elapsed_pdp_st + 1, &seasonal_coef);
949 }
951 /* periodically run a smoother for seasonal effects */
952 /* Need to use first cdp parameter buffer to track
953 * burnin (burnin requires a specific smoothing schedule).
954 * The CDP_init_seasonal parameter is really an RRA level,
955 * not a data source within RRA level parameter, but the rra_def
956 * is read only for rrd_update (not flushed to disk). */
957 iii = i * (rrd.stat_head->ds_cnt);
958 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
959 <= BURNIN_CYCLES) {
960 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
961 > rrd.rra_def[i].row_cnt - 1) {
962 /* mark off one of the burnin cycles */
963 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
964 u_cnt);
965 schedule_smooth = 1;
966 }
967 } else {
968 /* someone has no doubt invented a trick to deal with this
969 * wrap around, but at least this code is clear. */
970 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
971 u_cnt > rrd.rra_ptr[i].cur_row) {
972 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
973 * mapping between PDP and CDP */
974 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
975 >=
976 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
977 u_cnt) {
978 #ifdef DEBUG
979 fprintf(stderr,
980 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
981 rrd.rra_ptr[i].cur_row,
982 elapsed_pdp_st,
983 rrd.rra_def[i].
984 par[RRA_seasonal_smooth_idx].u_cnt);
985 #endif
986 schedule_smooth = 1;
987 }
988 } else {
989 /* can't rely on negative numbers because we are working with
990 * unsigned values */
991 /* Don't need modulus here. If we've wrapped more than once, only
992 * one smooth is executed at the end. */
993 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
994 rrd.rra_def[i].row_cnt
995 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
996 rrd.rra_def[i].row_cnt >=
997 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
998 u_cnt) {
999 #ifdef DEBUG
1000 fprintf(stderr,
1001 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
1002 rrd.rra_ptr[i].cur_row,
1003 elapsed_pdp_st,
1004 rrd.rra_def[i].
1005 par[RRA_seasonal_smooth_idx].u_cnt);
1006 #endif
1007 schedule_smooth = 1;
1008 }
1009 }
1010 }
1012 rra_current = rrd_tell(rrd_file);
1013 }
1014 /* if cf is DEVSEASONAL or SEASONAL */
1015 if (rrd_test_error())
1016 break;
1018 /* update CDP_PREP areas */
1019 /* loop over data soures within each RRA */
1020 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1022 /* iii indexes the CDP prep area for this data source within the RRA */
1023 iii = i * rrd.stat_head->ds_cnt + ii;
1025 if (rrd.rra_def[i].pdp_cnt > 1) {
1027 if (rra_step_cnt[i] > 0) {
1028 /* If we are in this block, as least 1 CDP value will be written to
1029 * disk, this is the CDP_primary_val entry. If more than 1 value needs
1030 * to be written, then the "fill in" value is the CDP_secondary_val
1031 * entry. */
1032 if (isnan(pdp_temp[ii])) {
1033 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1034 u_cnt += start_pdp_offset;
1035 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1036 u_val = DNAN;
1037 } else {
1038 /* CDP_secondary value is the RRA "fill in" value for intermediary
1039 * CDP data entries. No matter the CF, the value is the same because
1040 * the average, max, min, and last of a list of identical values is
1041 * the same, namely, the value itself. */
1042 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1043 u_val = pdp_temp[ii];
1044 }
1046 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1047 u_cnt >
1048 rrd.rra_def[i].pdp_cnt *
1049 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
1050 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1051 u_val = DNAN;
1052 /* initialize carry over */
1053 if (current_cf == CF_AVERAGE) {
1054 if (isnan(pdp_temp[ii])) {
1055 rrd.cdp_prep[iii].scratch[CDP_val].
1056 u_val = DNAN;
1057 } else {
1058 rrd.cdp_prep[iii].scratch[CDP_val].
1059 u_val =
1060 pdp_temp[ii] *
1061 ((elapsed_pdp_st -
1062 start_pdp_offset) %
1063 rrd.rra_def[i].pdp_cnt);
1064 }
1065 } else {
1066 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1067 pdp_temp[ii];
1068 }
1069 } else {
1070 rrd_value_t cum_val, cur_val;
1072 switch (current_cf) {
1073 case CF_AVERAGE:
1074 cum_val =
1075 IFDNAN(rrd.cdp_prep[iii].
1076 scratch[CDP_val].u_val, 0.0);
1077 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1078 rrd.cdp_prep[iii].
1079 scratch[CDP_primary_val].u_val =
1080 (cum_val +
1081 cur_val * start_pdp_offset) /
1082 (rrd.rra_def[i].pdp_cnt -
1083 rrd.cdp_prep[iii].
1084 scratch[CDP_unkn_pdp_cnt].u_cnt);
1085 /* initialize carry over value */
1086 if (isnan(pdp_temp[ii])) {
1087 rrd.cdp_prep[iii].scratch[CDP_val].
1088 u_val = DNAN;
1089 } else {
1090 rrd.cdp_prep[iii].scratch[CDP_val].
1091 u_val =
1092 pdp_temp[ii] *
1093 ((elapsed_pdp_st -
1094 start_pdp_offset) %
1095 rrd.rra_def[i].pdp_cnt);
1096 }
1097 break;
1098 case CF_MAXIMUM:
1099 cum_val =
1100 IFDNAN(rrd.cdp_prep[iii].
1101 scratch[CDP_val].u_val, -DINF);
1102 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1103 #ifdef DEBUG
1104 if (isnan
1105 (rrd.cdp_prep[iii].scratch[CDP_val].
1106 u_val) && isnan(pdp_temp[ii])) {
1107 fprintf(stderr,
1108 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1109 i, ii);
1110 exit(-1);
1111 }
1112 #endif
1113 if (cur_val > cum_val)
1114 rrd.cdp_prep[iii].
1115 scratch[CDP_primary_val].u_val =
1116 cur_val;
1117 else
1118 rrd.cdp_prep[iii].
1119 scratch[CDP_primary_val].u_val =
1120 cum_val;
1121 /* initialize carry over value */
1122 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1123 pdp_temp[ii];
1124 break;
1125 case CF_MINIMUM:
1126 cum_val =
1127 IFDNAN(rrd.cdp_prep[iii].
1128 scratch[CDP_val].u_val, DINF);
1129 cur_val = IFDNAN(pdp_temp[ii], DINF);
1130 #ifdef DEBUG
1131 if (isnan
1132 (rrd.cdp_prep[iii].scratch[CDP_val].
1133 u_val) && isnan(pdp_temp[ii])) {
1134 fprintf(stderr,
1135 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1136 i, ii);
1137 exit(-1);
1138 }
1139 #endif
1140 if (cur_val < cum_val)
1141 rrd.cdp_prep[iii].
1142 scratch[CDP_primary_val].u_val =
1143 cur_val;
1144 else
1145 rrd.cdp_prep[iii].
1146 scratch[CDP_primary_val].u_val =
1147 cum_val;
1148 /* initialize carry over value */
1149 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1150 pdp_temp[ii];
1151 break;
1152 case CF_LAST:
1153 default:
1154 rrd.cdp_prep[iii].
1155 scratch[CDP_primary_val].u_val =
1156 pdp_temp[ii];
1157 /* initialize carry over value */
1158 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1159 pdp_temp[ii];
1160 break;
1161 }
1162 } /* endif meets xff value requirement for a valid value */
1163 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1164 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1165 if (isnan(pdp_temp[ii]))
1166 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1167 u_cnt =
1168 (elapsed_pdp_st -
1169 start_pdp_offset) %
1170 rrd.rra_def[i].pdp_cnt;
1171 else
1172 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1173 u_cnt = 0;
1174 } else { /* rra_step_cnt[i] == 0 */
1176 #ifdef DEBUG
1177 if (isnan
1178 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1179 fprintf(stderr,
1180 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1181 i, ii);
1182 } else {
1183 fprintf(stderr,
1184 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1185 i, ii,
1186 rrd.cdp_prep[iii].scratch[CDP_val].
1187 u_val);
1188 }
1189 #endif
1190 if (isnan(pdp_temp[ii])) {
1191 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1192 u_cnt += elapsed_pdp_st;
1193 } else
1194 if (isnan
1195 (rrd.cdp_prep[iii].scratch[CDP_val].
1196 u_val)) {
1197 if (current_cf == CF_AVERAGE) {
1198 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1199 pdp_temp[ii] * elapsed_pdp_st;
1200 } else {
1201 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1202 pdp_temp[ii];
1203 }
1204 #ifdef DEBUG
1205 fprintf(stderr,
1206 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1207 i, ii,
1208 rrd.cdp_prep[iii].scratch[CDP_val].
1209 u_val);
1210 #endif
1211 } else {
1212 switch (current_cf) {
1213 case CF_AVERAGE:
1214 rrd.cdp_prep[iii].scratch[CDP_val].
1215 u_val +=
1216 pdp_temp[ii] * elapsed_pdp_st;
1217 break;
1218 case CF_MINIMUM:
1219 if (pdp_temp[ii] <
1220 rrd.cdp_prep[iii].scratch[CDP_val].
1221 u_val)
1222 rrd.cdp_prep[iii].scratch[CDP_val].
1223 u_val = pdp_temp[ii];
1224 break;
1225 case CF_MAXIMUM:
1226 if (pdp_temp[ii] >
1227 rrd.cdp_prep[iii].scratch[CDP_val].
1228 u_val)
1229 rrd.cdp_prep[iii].scratch[CDP_val].
1230 u_val = pdp_temp[ii];
1231 break;
1232 case CF_LAST:
1233 default:
1234 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1235 pdp_temp[ii];
1236 break;
1237 }
1238 }
1239 }
1240 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1241 if (elapsed_pdp_st > 2) {
1242 switch (current_cf) {
1243 case CF_AVERAGE:
1244 default:
1245 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1246 u_val = pdp_temp[ii];
1247 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1248 u_val = pdp_temp[ii];
1249 break;
1250 case CF_SEASONAL:
1251 case CF_DEVSEASONAL:
1252 /* need to update cached seasonal values, so they are consistent
1253 * with the bulk update */
1254 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1255 * CDP_last_deviation are the same. */
1256 rrd.cdp_prep[iii].
1257 scratch[CDP_hw_last_seasonal].u_val =
1258 last_seasonal_coef[ii];
1259 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1260 u_val = seasonal_coef[ii];
1261 break;
1262 case CF_HWPREDICT:
1263 /* need to update the null_count and last_null_count.
1264 * even do this for non-DNAN pdp_temp because the
1265 * algorithm is not learning from batch updates. */
1266 rrd.cdp_prep[iii].scratch[CDP_null_count].
1267 u_cnt += elapsed_pdp_st;
1268 rrd.cdp_prep[iii].
1269 scratch[CDP_last_null_count].u_cnt +=
1270 elapsed_pdp_st - 1;
1271 /* fall through */
1272 case CF_DEVPREDICT:
1273 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1274 u_val = DNAN;
1275 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1276 u_val = DNAN;
1277 break;
1278 case CF_FAILURES:
1279 /* do not count missed bulk values as failures */
1280 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1281 u_val = 0;
1282 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1283 u_val = 0;
1284 /* need to reset violations buffer.
1285 * could do this more carefully, but for now, just
1286 * assume a bulk update wipes away all violations. */
1287 erase_violations(&rrd, iii, i);
1288 break;
1289 }
1290 }
1291 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1293 if (rrd_test_error())
1294 break;
1296 } /* endif data sources loop */
1297 } /* end RRA Loop */
1299 /* this loop is only entered if elapsed_pdp_st < 3 */
1300 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1301 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1302 for (i = 0, rra_start = rra_begin;
1303 i < rrd.stat_head->rra_cnt;
1304 rra_start +=
1305 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1306 sizeof(rrd_value_t), i++) {
1307 if (rrd.rra_def[i].pdp_cnt > 1)
1308 continue;
1310 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1311 if (current_cf == CF_SEASONAL
1312 || current_cf == CF_DEVSEASONAL) {
1313 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1314 elapsed_pdp_st + (scratch_idx ==
1315 CDP_primary_val ? 1
1316 : 2),
1317 &seasonal_coef);
1318 rra_current = rrd_tell(rrd_file);
1319 }
1320 if (rrd_test_error())
1321 break;
1322 /* loop over data soures within each RRA */
1323 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1324 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1325 i * (rrd.stat_head->ds_cnt) + ii,
1326 i, ii, scratch_idx, seasonal_coef);
1327 }
1328 } /* end RRA Loop */
1329 if (rrd_test_error())
1330 break;
1331 } /* end elapsed_pdp_st loop */
1333 if (rrd_test_error())
1334 break;
1336 /* Ready to write to disk */
1337 /* Move sequentially through the file, writing one RRA at a time.
1338 * Note this architecture divorces the computation of CDP with
1339 * flushing updated RRA entries to disk. */
1340 for (i = 0, rra_start = rra_begin;
1341 i < rrd.stat_head->rra_cnt;
1342 rra_start +=
1343 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1344 sizeof(rrd_value_t), i++) {
1345 /* is th5Aere anything to write for this RRA? If not, continue. */
1346 if (rra_step_cnt[i] == 0)
1347 continue;
1349 /* write the first row */
1350 #ifdef DEBUG
1351 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1352 #endif
1353 rrd.rra_ptr[i].cur_row++;
1354 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1355 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1356 /* positition on the first row */
1357 rra_pos_tmp = rra_start +
1358 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1359 sizeof(rrd_value_t);
1360 if (rra_pos_tmp != rra_current) {
1361 #ifndef HAVE_MMAP
1362 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1363 rrd_set_error("seek error in rrd");
1364 break;
1365 }
1366 #endif
1367 rra_current = rra_pos_tmp;
1368 }
1369 #ifdef DEBUG
1370 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1371 #endif
1372 scratch_idx = CDP_primary_val;
1373 if (pcdp_summary != NULL) {
1374 rra_time = (current_time - current_time
1375 % (rrd.rra_def[i].pdp_cnt *
1376 rrd.stat_head->pdp_step))
1377 -
1378 ((rra_step_cnt[i] -
1379 1) * rrd.rra_def[i].pdp_cnt *
1380 rrd.stat_head->pdp_step);
1381 }
1382 #ifdef HAVE_MMAP
1383 pcdp_summary =
1384 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1385 rrd_file->fd, pcdp_summary, &rra_time,
1386 rrd_file->file_start);
1387 #else
1388 pcdp_summary =
1389 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1390 rrd_file->fd, pcdp_summary, &rra_time);
1391 #endif
1392 if (rrd_test_error())
1393 break;
1395 /* write other rows of the bulk update, if any */
1396 scratch_idx = CDP_secondary_val;
1397 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1398 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1399 #ifdef DEBUG
1400 fprintf(stderr,
1401 "Wraparound for RRA %s, %lu updates left\n",
1402 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1403 #endif
1404 /* wrap */
1405 rrd.rra_ptr[i].cur_row = 0;
1406 /* seek back to beginning of current rra */
1407 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1408 rrd_set_error("seek error in rrd");
1409 break;
1410 }
1411 #ifdef DEBUG
1412 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1413 rrd_file->pos);
1414 #endif
1415 rra_current = rra_start;
1416 }
1417 if (pcdp_summary != NULL) {
1418 rra_time = (current_time - current_time
1419 % (rrd.rra_def[i].pdp_cnt *
1420 rrd.stat_head->pdp_step))
1421 -
1422 ((rra_step_cnt[i] -
1423 2) * rrd.rra_def[i].pdp_cnt *
1424 rrd.stat_head->pdp_step);
1425 }
1426 #ifdef HAVE_MMAP
1427 pcdp_summary =
1428 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1429 rrd_file->fd, pcdp_summary, &rra_time,
1430 rrd_file->file_start);
1431 #else
1432 pcdp_summary =
1433 write_RRA_row(&rrd, i, &rra_current, scratch_idx,
1434 rrd_file->fd, pcdp_summary, &rra_time);
1435 #endif
1436 }
1438 if (rrd_test_error())
1439 break;
1440 } /* RRA LOOP */
1442 /* break out of the argument parsing loop if error_string is set */
1443 if (rrd_test_error()) {
1444 free(step_start);
1445 break;
1446 }
1448 } /* endif a pdp_st has occurred */
1449 rrd.live_head->last_up = current_time;
1450 rrd.live_head->last_up_usec = current_time_usec;
1451 free(step_start);
1452 } /* function argument loop */
1454 if (seasonal_coef != NULL)
1455 free(seasonal_coef);
1456 if (last_seasonal_coef != NULL)
1457 free(last_seasonal_coef);
1458 if (rra_step_cnt != NULL)
1459 free(rra_step_cnt);
1460 rpnstack_free(&rpnstack);
1462 #ifdef HAVE_MMAP
1463 if (munmap(rrd_file->file_start, rrd_file->file_len) == -1) {
1464 rrd_set_error("error writing(unmapping) file: %s", filename);
1465 }
1466 #endif
1467 /* if we got here and if there is an error and if the file has not been
1468 * written to, then close things up and return. */
1469 if (rrd_test_error()) {
1470 free(updvals);
1471 free(tmpl_idx);
1472 rrd_free(&rrd);
1473 free(pdp_temp);
1474 free(pdp_new);
1475 close(rrd_file->fd);
1476 return (-1);
1477 }
1479 /* aargh ... that was tough ... so many loops ... anyway, its done.
1480 * we just need to write back the live header portion now*/
1482 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1483 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1484 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1485 SEEK_SET) != 0) {
1486 rrd_set_error("seek rrd for live header writeback");
1487 free(updvals);
1488 free(tmpl_idx);
1489 rrd_free(&rrd);
1490 free(pdp_temp);
1491 free(pdp_new);
1492 close(rrd_file->fd);
1493 return (-1);
1494 }
1496 if (version >= 3) {
1497 if (rrd_write(rrd_file, rrd.live_head,
1498 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1499 rrd_set_error("rrd_write live_head to rrd");
1500 free(updvals);
1501 rrd_free(&rrd);
1502 free(tmpl_idx);
1503 free(pdp_temp);
1504 free(pdp_new);
1505 close(rrd_file->fd);
1506 return (-1);
1507 }
1508 } else {
1509 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1510 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1511 rrd_set_error("rrd_write live_head to rrd");
1512 free(updvals);
1513 rrd_free(&rrd);
1514 free(tmpl_idx);
1515 free(pdp_temp);
1516 free(pdp_new);
1517 close(rrd_file->fd);
1518 return (-1);
1519 }
1520 }
1523 if (rrd_write(rrd_file, rrd.pdp_prep,
1524 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1525 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1526 rrd_set_error("rrd_write pdp_prep to rrd");
1527 free(updvals);
1528 rrd_free(&rrd);
1529 free(tmpl_idx);
1530 free(pdp_temp);
1531 free(pdp_new);
1532 close(rrd_file->fd);
1533 return (-1);
1534 }
1536 if (rrd_write(rrd_file, rrd.cdp_prep,
1537 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1538 rrd.stat_head->ds_cnt)
1539 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1540 rrd.stat_head->ds_cnt)) {
1542 rrd_set_error("rrd_write cdp_prep to rrd");
1543 free(updvals);
1544 free(tmpl_idx);
1545 rrd_free(&rrd);
1546 free(pdp_temp);
1547 free(pdp_new);
1548 close(rrd_file->fd);
1549 return (-1);
1550 }
1552 if (rrd_write(rrd_file, rrd.rra_ptr,
1553 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1554 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1555 rrd_set_error("rrd_write rra_ptr to rrd");
1556 free(updvals);
1557 free(tmpl_idx);
1558 rrd_free(&rrd);
1559 free(pdp_temp);
1560 free(pdp_new);
1561 close(rrd_file->fd);
1562 return (-1);
1563 }
1564 #ifdef HAVE_POSIX_FADVISExxx
1566 /* with update we have write ops, so they will probably not be done by now, this means
1567 the buffers will not get freed. But calling this for the whole file - header
1568 will let the data off the hook as soon as it is written when if it is from a previous
1569 update cycle. Calling fdsync to force things is much too hard here. */
1571 if (0 != posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1572 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1573 rrd_strerror(errno));
1574 close(rrd_file->fd);
1575 return (-1);
1576 }
1577 #endif
1578 /*XXX: ? */ rrd_flush(rrd_file);
1580 /* calling the smoothing code here guarantees at most
1581 * one smoothing operation per rrd_update call. Unfortunately,
1582 * it is possible with bulk updates, or a long-delayed update
1583 * for smoothing to occur off-schedule. This really isn't
1584 * critical except during the burning cycles. */
1585 if (schedule_smooth) {
1586 // in_file = fopen(filename,"rb+");
1589 rra_start = rra_begin;
1590 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1591 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1592 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1593 #ifdef DEBUG
1594 fprintf(stderr, "Running smoother for rra %ld\n", i);
1595 #endif
1596 apply_smoother(&rrd, i, rra_start, rrd_file);
1597 if (rrd_test_error())
1598 break;
1599 }
1600 rra_start += rrd.rra_def[i].row_cnt
1601 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1602 }
1603 #ifdef HAVE_POSIX_FADVISExxx
1604 /* same procedure as above ... */
1605 if (0 !=
1606 posix_fadvise(rrd_file->fd, rra_begin, 0, POSIX_FADV_DONTNEED)) {
1607 rrd_set_error("setting POSIX_FADV_DONTNEED on '%s': %s", filename,
1608 rrd_strerror(errno));
1609 close(rrd_file->fd);
1610 return (-1);
1611 }
1612 #endif
1613 close(rrd_file->fd);
1614 }
1616 /* OK now close the files and free the memory */
1617 if (close(rrd_file->fd) != 0) {
1618 rrd_set_error("closing rrd");
1619 free(updvals);
1620 free(tmpl_idx);
1621 rrd_free(&rrd);
1622 free(pdp_temp);
1623 free(pdp_new);
1624 return (-1);
1625 }
1627 rrd_free(&rrd);
1628 free(updvals);
1629 free(tmpl_idx);
1630 free(pdp_new);
1631 free(pdp_temp);
1632 return (0);
1633 }
1635 /*
1636 * get exclusive lock to whole file.
1637 * lock gets removed when we close the file
1638 *
1639 * returns 0 on success
1640 */
1641 int LockRRD(
1642 int in_file)
1643 {
1644 int rcstat;
1646 {
1647 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1648 struct _stat st;
1650 if (_fstat(in_file, &st) == 0) {
1651 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1652 } else {
1653 rcstat = -1;
1654 }
1655 #else
1656 struct flock lock;
1658 lock.l_type = F_WRLCK; /* exclusive write lock */
1659 lock.l_len = 0; /* whole file */
1660 lock.l_start = 0; /* start of file */
1661 lock.l_whence = SEEK_SET; /* end of file */
1663 rcstat = fcntl(in_file, F_SETLK, &lock);
1664 #endif
1665 }
1667 return (rcstat);
1668 }
1671 #ifdef HAVE_MMAP
1672 info_t
1681 *write_RRA_row(
1682 rrd_t *rrd,
1683 unsigned long rra_idx,
1684 unsigned long *rra_current,
1685 unsigned short CDP_scratch_idx,
1686 #ifndef DEBUG
1687 int UNUSED(in_file),
1688 #else
1689 int in_file,
1690 #endif
1691 info_t *pcdp_summary,
1692 time_t *rra_time,
1693 void *rrd_mmaped_file)
1694 #else
1695 info_t
1704 *write_RRA_row(
1705 rrd_t *rrd,
1706 unsigned long rra_idx,
1707 unsigned long *rra_current,
1708 unsigned short CDP_scratch_idx,
1709 int in_file,
1710 info_t *pcdp_summary,
1711 time_t *rra_time)
1712 #endif
1713 {
1714 unsigned long ds_idx, cdp_idx;
1715 infoval iv;
1717 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
1718 /* compute the cdp index */
1719 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
1720 #ifdef DEBUG
1721 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1722 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
1723 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
1724 #endif
1725 if (pcdp_summary != NULL) {
1726 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1727 /* append info to the return hash */
1728 pcdp_summary = info_push(pcdp_summary,
1729 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1730 *rra_time,
1731 rrd->rra_def[rra_idx].
1732 cf_nam,
1733 rrd->rra_def[rra_idx].
1734 pdp_cnt,
1735 rrd->ds_def[ds_idx].
1736 ds_nam), RD_I_VAL, iv);
1737 }
1738 #ifdef HAVE_MMAP
1739 memcpy((char *) rrd_mmaped_file + *rra_current,
1740 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1741 sizeof(rrd_value_t));
1742 #else
1743 if (rrd_write
1744 (rrd_file,
1745 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1746 sizeof(rrd_value_t) * 1) != sizeof(rrd_value_t) * 1) {
1747 rrd_set_error("writing rrd");
1748 return 0;
1749 }
1750 #endif
1751 *rra_current += sizeof(rrd_value_t);
1752 }
1753 return (pcdp_summary);
1754 }