1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include "rrd_hw.h"
18 #include "rrd_rpncalc.h"
20 #include "rrd_is_thread_safe.h"
21 #include "unused.h"
23 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
24 /*
25 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
26 * replacement.
27 */
28 #include <sys/timeb.h>
30 #ifndef __MINGW32__
31 struct timeval {
32 time_t tv_sec; /* seconds */
33 long tv_usec; /* microseconds */
34 };
35 #endif
37 struct __timezone {
38 int tz_minuteswest; /* minutes W of Greenwich */
39 int tz_dsttime; /* type of dst correction */
40 };
42 static int gettimeofday(
43 struct timeval *t,
44 struct __timezone *tz)
45 {
47 struct _timeb current_time;
49 _ftime(¤t_time);
51 t->tv_sec = current_time.time;
52 t->tv_usec = current_time.millitm * 1000;
54 return 0;
55 }
57 #endif
58 /*
59 * normalize time as returned by gettimeofday. usec part must
60 * be always >= 0
61 */
62 static inline void normalize_time(
63 struct timeval *t)
64 {
65 if (t->tv_usec < 0) {
66 t->tv_sec--;
67 t->tv_usec += 1000000L;
68 }
69 }
71 static inline info_t *write_RRA_row(
72 rrd_file_t *rrd_file,
73 rrd_t *rrd,
74 unsigned long rra_idx,
75 unsigned long *rra_current,
76 unsigned short CDP_scratch_idx,
77 info_t *pcdp_summary,
78 time_t *rra_time)
79 {
80 unsigned long ds_idx, cdp_idx;
81 infoval iv;
83 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
84 /* compute the cdp index */
85 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
86 #ifdef DEBUG
87 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
88 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
89 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
90 #endif
91 if (pcdp_summary != NULL) {
92 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
93 /* append info to the return hash */
94 pcdp_summary = info_push(pcdp_summary,
95 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
96 *rra_time,
97 rrd->rra_def[rra_idx].
98 cf_nam,
99 rrd->rra_def[rra_idx].
100 pdp_cnt,
101 rrd->ds_def[ds_idx].
102 ds_nam), RD_I_VAL, iv);
103 }
104 if (rrd_write
105 (rrd_file,
106 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
107 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
108 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
109 return 0;
110 }
111 *rra_current += sizeof(rrd_value_t);
112 }
113 return (pcdp_summary);
114 }
116 int rrd_update_r(
117 const char *filename,
118 const char *tmplt,
119 int argc,
120 const char **argv);
121 int _rrd_update(
122 const char *filename,
123 const char *tmplt,
124 int argc,
125 const char **argv,
126 info_t *);
128 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
131 info_t *rrd_update_v(
132 int argc,
133 char **argv)
134 {
135 char *tmplt = NULL;
136 info_t *result = NULL;
137 infoval rc;
138 struct option long_options[] = {
139 {"template", required_argument, 0, 't'},
140 {0, 0, 0, 0}
141 };
143 rc.u_int = -1;
144 optind = 0;
145 opterr = 0; /* initialize getopt */
147 while (1) {
148 int option_index = 0;
149 int opt;
151 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
153 if (opt == EOF)
154 break;
156 switch (opt) {
157 case 't':
158 tmplt = optarg;
159 break;
161 case '?':
162 rrd_set_error("unknown option '%s'", argv[optind - 1]);
163 goto end_tag;
164 }
165 }
167 /* need at least 2 arguments: filename, data. */
168 if (argc - optind < 2) {
169 rrd_set_error("Not enough arguments");
170 goto end_tag;
171 }
172 rc.u_int = 0;
173 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
174 rc.u_int = _rrd_update(argv[optind], tmplt,
175 argc - optind - 1,
176 (const char **) (argv + optind + 1), result);
177 result->value.u_int = rc.u_int;
178 end_tag:
179 return result;
180 }
182 int rrd_update(
183 int argc,
184 char **argv)
185 {
186 struct option long_options[] = {
187 {"template", required_argument, 0, 't'},
188 {0, 0, 0, 0}
189 };
190 int option_index = 0;
191 int opt;
192 char *tmplt = NULL;
193 int rc;
195 optind = 0;
196 opterr = 0; /* initialize getopt */
198 while (1) {
199 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
201 if (opt == EOF)
202 break;
204 switch (opt) {
205 case 't':
206 tmplt = strdup(optarg);
207 break;
209 case '?':
210 rrd_set_error("unknown option '%s'", argv[optind - 1]);
211 return (-1);
212 }
213 }
215 /* need at least 2 arguments: filename, data. */
216 if (argc - optind < 2) {
217 rrd_set_error("Not enough arguments");
219 return -1;
220 }
222 rc = rrd_update_r(argv[optind], tmplt,
223 argc - optind - 1, (const char **) (argv + optind + 1));
224 free(tmplt);
225 return rc;
226 }
228 int rrd_update_r(
229 const char *filename,
230 const char *tmplt,
231 int argc,
232 const char **argv)
233 {
234 return _rrd_update(filename, tmplt, argc, argv, NULL);
235 }
237 int _rrd_update(
238 const char *filename,
239 const char *tmplt,
240 int argc,
241 const char **argv,
242 info_t *pcdp_summary)
243 {
245 int arg_i = 2;
246 short j;
247 unsigned long i, ii, iii = 1;
249 unsigned long rra_begin; /* byte pointer to the rra
250 * area in the rrd file. this
251 * pointer never changes value */
252 unsigned long rra_start; /* byte pointer to the rra
253 * area in the rrd file. this
254 * pointer changes as each rrd is
255 * processed. */
256 unsigned long rra_current; /* byte pointer to the current write
257 * spot in the rrd file. */
258 unsigned long rra_pos_tmp; /* temporary byte pointer. */
259 double interval, pre_int, post_int; /* interval between this and
260 * the last run */
261 unsigned long proc_pdp_st; /* which pdp_st was the last
262 * to be processed */
263 unsigned long occu_pdp_st; /* when was the pdp_st
264 * before the last update
265 * time */
266 unsigned long proc_pdp_age; /* how old was the data in
267 * the pdp prep area when it
268 * was last updated */
269 unsigned long occu_pdp_age; /* how long ago was the last
270 * pdp_step time */
271 rrd_value_t *pdp_new; /* prepare the incoming data
272 * to be added the the
273 * existing entry */
274 rrd_value_t *pdp_temp; /* prepare the pdp values
275 * to be added the the
276 * cdp values */
278 long *tmpl_idx; /* index representing the settings
279 transported by the tmplt index */
280 unsigned long tmpl_cnt = 2; /* time and data */
282 rrd_t rrd;
283 time_t current_time = 0;
284 time_t rra_time = 0; /* time of update for a RRA */
285 unsigned long current_time_usec = 0; /* microseconds part of current time */
286 struct timeval tmp_time; /* used for time conversion */
288 char **updvals;
289 int schedule_smooth = 0;
290 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
292 /* a vector of future Holt-Winters seasonal coefs */
293 unsigned long elapsed_pdp_st;
295 /* number of elapsed PDP steps since last update */
296 unsigned long *rra_step_cnt = NULL;
298 /* number of rows to be updated in an RRA for a data
299 * value. */
300 unsigned long start_pdp_offset;
302 /* number of PDP steps since the last update that
303 * are assigned to the first CDP to be generated
304 * since the last update. */
305 unsigned short scratch_idx;
307 /* index into the CDP scratch array */
308 enum cf_en current_cf;
310 /* numeric id of the current consolidation function */
311 rpnstack_t rpnstack; /* used for COMPUTE DS */
312 int version; /* rrd version */
313 char *endptr; /* used in the conversion */
314 rrd_file_t *rrd_file;
316 rpnstack_init(&rpnstack);
318 /* need at least 1 arguments: data. */
319 if (argc < 1) {
320 rrd_set_error("Not enough arguments");
321 goto err_out;
322 }
324 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
325 if (rrd_file == NULL) {
326 goto err_free;
327 }
328 /* We are now at the beginning of the rra's */
329 rra_current = rra_start = rra_begin = rrd_file->header_len;
331 /* initialize time */
332 version = atoi(rrd.stat_head->version);
333 gettimeofday(&tmp_time, 0);
334 normalize_time(&tmp_time);
335 current_time = tmp_time.tv_sec;
336 if (version >= 3) {
337 current_time_usec = tmp_time.tv_usec;
338 } else {
339 current_time_usec = 0;
340 }
342 /* get exclusive lock to whole file.
343 * lock gets removed when we close the file.
344 */
345 if (LockRRD(rrd_file->fd) != 0) {
346 rrd_set_error("could not lock RRD");
347 goto err_close;
348 }
350 if ((updvals =
351 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
352 rrd_set_error("allocating updvals pointer array");
353 goto err_close;
354 }
356 if ((pdp_temp = malloc(sizeof(rrd_value_t)
357 * rrd.stat_head->ds_cnt)) == NULL) {
358 rrd_set_error("allocating pdp_temp ...");
359 goto err_free_updvals;
360 }
362 if ((tmpl_idx = malloc(sizeof(unsigned long)
363 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
364 rrd_set_error("allocating tmpl_idx ...");
365 goto err_free_pdp_temp;
366 }
367 /* initialize tmplt redirector */
368 /* default config example (assume DS 1 is a CDEF DS)
369 tmpl_idx[0] -> 0; (time)
370 tmpl_idx[1] -> 1; (DS 0)
371 tmpl_idx[2] -> 3; (DS 2)
372 tmpl_idx[3] -> 4; (DS 3) */
373 tmpl_idx[0] = 0; /* time */
374 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
375 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
376 tmpl_idx[ii++] = i;
377 }
378 tmpl_cnt = ii;
380 if (tmplt) {
381 /* we should work on a writeable copy here */
382 char *dsname;
383 unsigned int tmpl_len;
384 char *tmplt_copy = strdup(tmplt);
386 dsname = tmplt_copy;
387 tmpl_cnt = 1; /* the first entry is the time */
388 tmpl_len = strlen(tmplt_copy);
389 for (i = 0; i <= tmpl_len; i++) {
390 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
391 tmplt_copy[i] = '\0';
392 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
393 rrd_set_error
394 ("tmplt contains more DS definitions than RRD");
395 goto err_free_tmpl_idx;
396 }
397 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
398 rrd_set_error("unknown DS name '%s'", dsname);
399 goto err_free_tmpl_idx;
400 } else {
401 /* the first element is always the time */
402 tmpl_idx[tmpl_cnt - 1]++;
403 /* go to the next entry on the tmplt_copy */
404 dsname = &tmplt_copy[i + 1];
405 /* fix the damage we did before */
406 if (i < tmpl_len) {
407 tmplt_copy[i] = ':';
408 }
410 }
411 }
412 }
413 free(tmplt_copy);
414 }
415 if ((pdp_new = malloc(sizeof(rrd_value_t)
416 * rrd.stat_head->ds_cnt)) == NULL) {
417 rrd_set_error("allocating pdp_new ...");
418 goto err_free_tmpl_idx;
419 }
420 /* loop through the arguments. */
421 for (arg_i = 0; arg_i < argc; arg_i++) {
422 char *stepper = strdup(argv[arg_i]);
423 char *step_start = stepper;
424 char *p;
425 char *parsetime_error = NULL;
426 enum { atstyle, normal } timesyntax;
427 struct rrd_time_value ds_tv;
429 if (stepper == NULL) {
430 rrd_set_error("failed duplication argv entry");
431 free(step_start);
432 goto err_free_pdp_new;
433 }
434 /* initialize all ds input to unknown except the first one
435 which has always got to be set */
436 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
437 updvals[ii] = "U";
438 updvals[0] = stepper;
439 /* separate all ds elements; first must be examined separately
440 due to alternate time syntax */
441 if ((p = strchr(stepper, '@')) != NULL) {
442 timesyntax = atstyle;
443 *p = '\0';
444 stepper = p + 1;
445 } else if ((p = strchr(stepper, ':')) != NULL) {
446 timesyntax = normal;
447 *p = '\0';
448 stepper = p + 1;
449 } else {
450 rrd_set_error
451 ("expected timestamp not found in data source from %s",
452 argv[arg_i]);
453 free(step_start);
454 break;
455 }
456 ii = 1;
457 updvals[tmpl_idx[ii]] = stepper;
458 while (*stepper) {
459 if (*stepper == ':') {
460 *stepper = '\0';
461 ii++;
462 if (ii < tmpl_cnt) {
463 updvals[tmpl_idx[ii]] = stepper + 1;
464 }
465 }
466 stepper++;
467 }
469 if (ii != tmpl_cnt - 1) {
470 rrd_set_error
471 ("expected %lu data source readings (got %lu) from %s",
472 tmpl_cnt - 1, ii, argv[arg_i]);
473 free(step_start);
474 break;
475 }
477 /* get the time from the reading ... handle N */
478 if (timesyntax == atstyle) {
479 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
480 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
481 free(step_start);
482 break;
483 }
484 if (ds_tv.type == RELATIVE_TO_END_TIME ||
485 ds_tv.type == RELATIVE_TO_START_TIME) {
486 rrd_set_error("specifying time relative to the 'start' "
487 "or 'end' makes no sense here: %s", updvals[0]);
488 free(step_start);
489 break;
490 }
492 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
494 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
496 } else if (strcmp(updvals[0], "N") == 0) {
497 gettimeofday(&tmp_time, 0);
498 normalize_time(&tmp_time);
499 current_time = tmp_time.tv_sec;
500 current_time_usec = tmp_time.tv_usec;
501 } else {
502 double tmp;
504 tmp = strtod(updvals[0], 0);
505 current_time = floor(tmp);
506 current_time_usec =
507 (long) ((tmp - (double) current_time) * 1000000.0);
508 }
509 /* dont do any correction for old version RRDs */
510 if (version < 3)
511 current_time_usec = 0;
513 if (current_time < rrd.live_head->last_up ||
514 (current_time == rrd.live_head->last_up &&
515 (long) current_time_usec <=
516 (long) rrd.live_head->last_up_usec)) {
517 rrd_set_error("illegal attempt to update using time %ld when "
518 "last update time is %ld (minimum one second step)",
519 current_time, rrd.live_head->last_up);
520 free(step_start);
521 break;
522 }
524 /* seek to the beginning of the rra's */
525 if (rra_current != rra_begin) {
526 #ifndef HAVE_MMAP
527 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
528 rrd_set_error("seek error in rrd");
529 free(step_start);
530 break;
531 }
532 #endif
533 rra_current = rra_begin;
534 }
535 rra_start = rra_begin;
537 /* when was the current pdp started */
538 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
539 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
541 /* when did the last pdp_st occur */
542 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
543 occu_pdp_st = current_time - occu_pdp_age;
545 /* interval = current_time - rrd.live_head->last_up; */
546 interval = (double) (current_time - rrd.live_head->last_up)
547 + (double) ((long) current_time_usec -
548 (long) rrd.live_head->last_up_usec) / 1000000.0;
550 if (occu_pdp_st > proc_pdp_st) {
551 /* OK we passed the pdp_st moment */
552 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
553 * occurred before the latest
554 * pdp_st moment*/
555 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
556 post_int = occu_pdp_age; /* how much after it */
557 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
558 } else {
559 pre_int = interval;
560 post_int = 0;
561 }
563 #ifdef DEBUG
564 printf("proc_pdp_age %lu\t"
565 "proc_pdp_st %lu\t"
566 "occu_pfp_age %lu\t"
567 "occu_pdp_st %lu\t"
568 "int %lf\t"
569 "pre_int %lf\t"
570 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
571 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
572 #endif
574 /* process the data sources and update the pdp_prep
575 * area accordingly */
576 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
577 enum dst_en dst_idx;
579 dst_idx = dst_conv(rrd.ds_def[i].dst);
581 /* make sure we do not build diffs with old last_ds values */
582 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
583 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
584 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
585 }
587 /* NOTE: DST_CDEF should never enter this if block, because
588 * updvals[i+1][0] is initialized to 'U'; unless the caller
589 * accidently specified a value for the DST_CDEF. To handle
590 * this case, an extra check is required. */
592 if ((updvals[i + 1][0] != 'U') &&
593 (dst_idx != DST_CDEF) &&
594 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
595 double rate = DNAN;
597 /* the data source type defines how to process the data */
598 /* pdp_new contains rate * time ... eg the bytes
599 * transferred during the interval. Doing it this way saves
600 * a lot of math operations */
601 switch (dst_idx) {
602 case DST_COUNTER:
603 case DST_DERIVE:
604 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
605 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
606 if ((updvals[i + 1][ii] < '0'
607 || updvals[i + 1][ii] > '9') && (ii != 0
608 && updvals[i
609 +
610 1]
611 [ii] !=
612 '-')) {
613 rrd_set_error("not a simple integer: '%s'",
614 updvals[i + 1]);
615 break;
616 }
617 }
618 if (rrd_test_error()) {
619 break;
620 }
621 pdp_new[i] =
622 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
623 if (dst_idx == DST_COUNTER) {
624 /* simple overflow catcher suggested by Andres Kroonmaa */
625 /* this will fail terribly for non 32 or 64 bit counters ... */
626 /* are there any others in SNMP land ? */
627 if (pdp_new[i] < (double) 0.0)
628 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
629 if (pdp_new[i] < (double) 0.0)
630 pdp_new[i] += (double) 18446744069414584320.0;
631 /* 2^64-2^32 */ ;
632 }
633 rate = pdp_new[i] / interval;
634 } else {
635 pdp_new[i] = DNAN;
636 }
637 break;
638 case DST_ABSOLUTE:
639 errno = 0;
640 pdp_new[i] = strtod(updvals[i + 1], &endptr);
641 if (errno > 0) {
642 rrd_set_error("converting '%s' to float: %s",
643 updvals[i + 1], rrd_strerror(errno));
644 break;
645 };
646 if (endptr[0] != '\0') {
647 rrd_set_error
648 ("conversion of '%s' to float not complete: tail '%s'",
649 updvals[i + 1], endptr);
650 break;
651 }
652 rate = pdp_new[i] / interval;
653 break;
654 case DST_GAUGE:
655 errno = 0;
656 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
657 if (errno > 0) {
658 rrd_set_error("converting '%s' to float: %s",
659 updvals[i + 1], rrd_strerror(errno));
660 break;
661 };
662 if (endptr[0] != '\0') {
663 rrd_set_error
664 ("conversion of '%s' to float not complete: tail '%s'",
665 updvals[i + 1], endptr);
666 break;
667 }
668 rate = pdp_new[i] / interval;
669 break;
670 default:
671 rrd_set_error("rrd contains unknown DS type : '%s'",
672 rrd.ds_def[i].dst);
673 break;
674 }
675 /* break out of this for loop if the error string is set */
676 if (rrd_test_error()) {
677 break;
678 }
679 /* make sure pdp_temp is neither too large or too small
680 * if any of these occur it becomes unknown ...
681 * sorry folks ... */
682 if (!isnan(rate) &&
683 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
684 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
685 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
686 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
687 pdp_new[i] = DNAN;
688 }
689 } else {
690 /* no news is news all the same */
691 pdp_new[i] = DNAN;
692 }
695 /* make a copy of the command line argument for the next run */
696 #ifdef DEBUG
697 fprintf(stderr,
698 "prep ds[%lu]\t"
699 "last_arg '%s'\t"
700 "this_arg '%s'\t"
701 "pdp_new %10.2f\n",
702 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
703 #endif
704 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
705 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
706 }
707 /* break out of the argument parsing loop if the error_string is set */
708 if (rrd_test_error()) {
709 free(step_start);
710 break;
711 }
712 /* has a pdp_st moment occurred since the last run ? */
714 if (proc_pdp_st == occu_pdp_st) {
715 /* no we have not passed a pdp_st moment. therefore update is simple */
717 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
718 if (isnan(pdp_new[i])) {
719 /* this is not realy accurate if we use subsecond data arival time
720 should have thought of it when going subsecond resolution ...
721 sorry next format change we will have it! */
722 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
723 floor(interval);
724 } else {
725 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
726 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
727 } else {
728 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
729 }
730 }
731 #ifdef DEBUG
732 fprintf(stderr,
733 "NO PDP ds[%lu]\t"
734 "value %10.2f\t"
735 "unkn_sec %5lu\n",
736 i,
737 rrd.pdp_prep[i].scratch[PDP_val].u_val,
738 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
739 #endif
740 }
741 } else {
742 /* an pdp_st has occurred. */
744 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
745 rate*seconds which occurred up to the last run.
746 pdp_new[] contains rate*seconds from the latest run.
747 pdp_temp[] will contain the rate for cdp */
749 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
750 /* update pdp_prep to the current pdp_st. */
751 double pre_unknown = 0.0;
753 if (isnan(pdp_new[i])) {
754 /* a final bit of unkonwn to be added bevore calculation
755 we use a temporary variable for this so that we
756 don't have to turn integer lines before using the value */
757 pre_unknown = pre_int;
758 } else {
759 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
760 rrd.pdp_prep[i].scratch[PDP_val].u_val =
761 pdp_new[i] / interval * pre_int;
762 } else {
763 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
764 pdp_new[i] / interval * pre_int;
765 }
766 }
769 /* if too much of the pdp_prep is unknown we dump it */
770 if (
771 /* removed because this does not agree with the
772 definition that a heartbeat can be unknown */
773 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
774 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
775 /* if the interval is larger thatn mrhb we get NAN */
776 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
777 (occu_pdp_st - proc_pdp_st <=
778 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
779 pdp_temp[i] = DNAN;
780 } else {
781 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
782 / ((double) (occu_pdp_st - proc_pdp_st
783 -
784 rrd.pdp_prep[i].
785 scratch[PDP_unkn_sec_cnt].u_cnt)
786 - pre_unknown);
787 }
789 /* process CDEF data sources; remember each CDEF DS can
790 * only reference other DS with a lower index number */
791 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
792 rpnp_t *rpnp;
794 rpnp =
795 rpn_expand((rpn_cdefds_t *) &
796 (rrd.ds_def[i].par[DS_cdef]));
797 /* substitue data values for OP_VARIABLE nodes */
798 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
799 if (rpnp[ii].op == OP_VARIABLE) {
800 rpnp[ii].op = OP_NUMBER;
801 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
802 }
803 }
804 /* run the rpn calculator */
805 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
806 free(rpnp);
807 break; /* exits the data sources pdp_temp loop */
808 }
809 }
811 /* make pdp_prep ready for the next run */
812 if (isnan(pdp_new[i])) {
813 /* this is not realy accurate if we use subsecond data arival time
814 should have thought of it when going subsecond resolution ...
815 sorry next format change we will have it! */
816 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
817 floor(post_int);
818 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819 } else {
820 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821 rrd.pdp_prep[i].scratch[PDP_val].u_val =
822 pdp_new[i] / interval * post_int;
823 }
825 #ifdef DEBUG
826 fprintf(stderr,
827 "PDP UPD ds[%lu]\t"
828 "pdp_temp %10.2f\t"
829 "new_prep %10.2f\t"
830 "new_unkn_sec %5lu\n",
831 i, pdp_temp[i],
832 rrd.pdp_prep[i].scratch[PDP_val].u_val,
833 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 #endif
835 }
837 /* if there were errors during the last loop, bail out here */
838 if (rrd_test_error()) {
839 free(step_start);
840 break;
841 }
843 /* compute the number of elapsed pdp_st moments */
844 elapsed_pdp_st =
845 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
846 #ifdef DEBUG
847 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
848 #endif
849 if (rra_step_cnt == NULL) {
850 rra_step_cnt = (unsigned long *)
851 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
852 }
854 for (i = 0, rra_start = rra_begin;
855 i < rrd.stat_head->rra_cnt;
856 rra_start +=
857 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
858 sizeof(rrd_value_t), i++) {
859 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861 (proc_pdp_st / rrd.stat_head->pdp_step) %
862 rrd.rra_def[i].pdp_cnt;
863 if (start_pdp_offset <= elapsed_pdp_st) {
864 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
865 rrd.rra_def[i].pdp_cnt + 1;
866 } else {
867 rra_step_cnt[i] = 0;
868 }
870 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
871 /* If this is a bulk update, we need to skip ahead in
872 the seasonal arrays so that they will be correct for
873 the next observed value;
874 note that for the bulk update itself, no update will
875 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
876 and DEVPREDICT will be set to DNAN. */
877 if (rra_step_cnt[i] > 2) {
878 /* skip update by resetting rra_step_cnt[i],
879 note that this is not data source specific; this is
880 due to the bulk update, not a DNAN value for the
881 specific data source. */
882 rra_step_cnt[i] = 0;
883 lookup_seasonal(&rrd, i, rra_start, rrd_file,
884 elapsed_pdp_st, &last_seasonal_coef);
885 lookup_seasonal(&rrd, i, rra_start, rrd_file,
886 elapsed_pdp_st + 1, &seasonal_coef);
887 }
889 /* periodically run a smoother for seasonal effects */
890 /* Need to use first cdp parameter buffer to track
891 * burnin (burnin requires a specific smoothing schedule).
892 * The CDP_init_seasonal parameter is really an RRA level,
893 * not a data source within RRA level parameter, but the rra_def
894 * is read only for rrd_update (not flushed to disk). */
895 iii = i * (rrd.stat_head->ds_cnt);
896 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
897 <= BURNIN_CYCLES) {
898 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
899 > rrd.rra_def[i].row_cnt - 1) {
900 /* mark off one of the burnin cycles */
901 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
902 u_cnt);
903 schedule_smooth = 1;
904 }
905 } else {
906 /* someone has no doubt invented a trick to deal with this
907 * wrap around, but at least this code is clear. */
908 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
909 u_cnt > rrd.rra_ptr[i].cur_row) {
910 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
911 * mapping between PDP and CDP */
912 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
913 >=
914 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
915 u_cnt) {
916 #ifdef DEBUG
917 fprintf(stderr,
918 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919 rrd.rra_ptr[i].cur_row,
920 elapsed_pdp_st,
921 rrd.rra_def[i].
922 par[RRA_seasonal_smooth_idx].u_cnt);
923 #endif
924 schedule_smooth = 1;
925 }
926 } else {
927 /* can't rely on negative numbers because we are working with
928 * unsigned values */
929 /* Don't need modulus here. If we've wrapped more than once, only
930 * one smooth is executed at the end. */
931 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
932 rrd.rra_def[i].row_cnt
933 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
934 rrd.rra_def[i].row_cnt >=
935 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
936 u_cnt) {
937 #ifdef DEBUG
938 fprintf(stderr,
939 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
940 rrd.rra_ptr[i].cur_row,
941 elapsed_pdp_st,
942 rrd.rra_def[i].
943 par[RRA_seasonal_smooth_idx].u_cnt);
944 #endif
945 schedule_smooth = 1;
946 }
947 }
948 }
950 rra_current = rrd_tell(rrd_file);
951 }
952 /* if cf is DEVSEASONAL or SEASONAL */
953 if (rrd_test_error())
954 break;
956 /* update CDP_PREP areas */
957 /* loop over data soures within each RRA */
958 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
960 /* iii indexes the CDP prep area for this data source within the RRA */
961 iii = i * rrd.stat_head->ds_cnt + ii;
963 if (rrd.rra_def[i].pdp_cnt > 1) {
965 if (rra_step_cnt[i] > 0) {
966 /* If we are in this block, as least 1 CDP value will be written to
967 * disk, this is the CDP_primary_val entry. If more than 1 value needs
968 * to be written, then the "fill in" value is the CDP_secondary_val
969 * entry. */
970 if (isnan(pdp_temp[ii])) {
971 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
972 u_cnt += start_pdp_offset;
973 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
974 u_val = DNAN;
975 } else {
976 /* CDP_secondary value is the RRA "fill in" value for intermediary
977 * CDP data entries. No matter the CF, the value is the same because
978 * the average, max, min, and last of a list of identical values is
979 * the same, namely, the value itself. */
980 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
981 u_val = pdp_temp[ii];
982 }
984 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
985 u_cnt >
986 rrd.rra_def[i].pdp_cnt *
987 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
988 rrd.cdp_prep[iii].scratch[CDP_primary_val].
989 u_val = DNAN;
990 /* initialize carry over */
991 if (current_cf == CF_AVERAGE) {
992 if (isnan(pdp_temp[ii])) {
993 rrd.cdp_prep[iii].scratch[CDP_val].
994 u_val = DNAN;
995 } else {
996 rrd.cdp_prep[iii].scratch[CDP_val].
997 u_val =
998 pdp_temp[ii] *
999 ((elapsed_pdp_st -
1000 start_pdp_offset) %
1001 rrd.rra_def[i].pdp_cnt);
1002 }
1003 } else {
1004 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1005 pdp_temp[ii];
1006 }
1007 } else {
1008 rrd_value_t cum_val, cur_val;
1010 switch (current_cf) {
1011 case CF_AVERAGE:
1012 cum_val =
1013 IFDNAN(rrd.cdp_prep[iii].
1014 scratch[CDP_val].u_val, 0.0);
1015 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1016 rrd.cdp_prep[iii].
1017 scratch[CDP_primary_val].u_val =
1018 (cum_val +
1019 cur_val * start_pdp_offset) /
1020 (rrd.rra_def[i].pdp_cnt -
1021 rrd.cdp_prep[iii].
1022 scratch[CDP_unkn_pdp_cnt].u_cnt);
1023 /* initialize carry over value */
1024 if (isnan(pdp_temp[ii])) {
1025 rrd.cdp_prep[iii].scratch[CDP_val].
1026 u_val = DNAN;
1027 } else {
1028 rrd.cdp_prep[iii].scratch[CDP_val].
1029 u_val =
1030 pdp_temp[ii] *
1031 ((elapsed_pdp_st -
1032 start_pdp_offset) %
1033 rrd.rra_def[i].pdp_cnt);
1034 }
1035 break;
1036 case CF_MAXIMUM:
1037 cum_val =
1038 IFDNAN(rrd.cdp_prep[iii].
1039 scratch[CDP_val].u_val, -DINF);
1040 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1041 #ifdef DEBUG
1042 if (isnan
1043 (rrd.cdp_prep[iii].scratch[CDP_val].
1044 u_val) && isnan(pdp_temp[ii])) {
1045 fprintf(stderr,
1046 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1047 i, ii);
1048 exit(-1);
1049 }
1050 #endif
1051 if (cur_val > cum_val)
1052 rrd.cdp_prep[iii].
1053 scratch[CDP_primary_val].u_val =
1054 cur_val;
1055 else
1056 rrd.cdp_prep[iii].
1057 scratch[CDP_primary_val].u_val =
1058 cum_val;
1059 /* initialize carry over value */
1060 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1061 pdp_temp[ii];
1062 break;
1063 case CF_MINIMUM:
1064 cum_val =
1065 IFDNAN(rrd.cdp_prep[iii].
1066 scratch[CDP_val].u_val, DINF);
1067 cur_val = IFDNAN(pdp_temp[ii], DINF);
1068 #ifdef DEBUG
1069 if (isnan
1070 (rrd.cdp_prep[iii].scratch[CDP_val].
1071 u_val) && isnan(pdp_temp[ii])) {
1072 fprintf(stderr,
1073 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1074 i, ii);
1075 exit(-1);
1076 }
1077 #endif
1078 if (cur_val < cum_val)
1079 rrd.cdp_prep[iii].
1080 scratch[CDP_primary_val].u_val =
1081 cur_val;
1082 else
1083 rrd.cdp_prep[iii].
1084 scratch[CDP_primary_val].u_val =
1085 cum_val;
1086 /* initialize carry over value */
1087 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1088 pdp_temp[ii];
1089 break;
1090 case CF_LAST:
1091 default:
1092 rrd.cdp_prep[iii].
1093 scratch[CDP_primary_val].u_val =
1094 pdp_temp[ii];
1095 /* initialize carry over value */
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1097 pdp_temp[ii];
1098 break;
1099 }
1100 } /* endif meets xff value requirement for a valid value */
1101 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1102 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1103 if (isnan(pdp_temp[ii]))
1104 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1105 u_cnt =
1106 (elapsed_pdp_st -
1107 start_pdp_offset) %
1108 rrd.rra_def[i].pdp_cnt;
1109 else
1110 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1111 u_cnt = 0;
1112 } else { /* rra_step_cnt[i] == 0 */
1114 #ifdef DEBUG
1115 if (isnan
1116 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1117 fprintf(stderr,
1118 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1119 i, ii);
1120 } else {
1121 fprintf(stderr,
1122 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1123 i, ii,
1124 rrd.cdp_prep[iii].scratch[CDP_val].
1125 u_val);
1126 }
1127 #endif
1128 if (isnan(pdp_temp[ii])) {
1129 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1130 u_cnt += elapsed_pdp_st;
1131 } else
1132 if (isnan
1133 (rrd.cdp_prep[iii].scratch[CDP_val].
1134 u_val)) {
1135 if (current_cf == CF_AVERAGE) {
1136 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1137 pdp_temp[ii] * elapsed_pdp_st;
1138 } else {
1139 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1140 pdp_temp[ii];
1141 }
1142 #ifdef DEBUG
1143 fprintf(stderr,
1144 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1145 i, ii,
1146 rrd.cdp_prep[iii].scratch[CDP_val].
1147 u_val);
1148 #endif
1149 } else {
1150 switch (current_cf) {
1151 case CF_AVERAGE:
1152 rrd.cdp_prep[iii].scratch[CDP_val].
1153 u_val +=
1154 pdp_temp[ii] * elapsed_pdp_st;
1155 break;
1156 case CF_MINIMUM:
1157 if (pdp_temp[ii] <
1158 rrd.cdp_prep[iii].scratch[CDP_val].
1159 u_val)
1160 rrd.cdp_prep[iii].scratch[CDP_val].
1161 u_val = pdp_temp[ii];
1162 break;
1163 case CF_MAXIMUM:
1164 if (pdp_temp[ii] >
1165 rrd.cdp_prep[iii].scratch[CDP_val].
1166 u_val)
1167 rrd.cdp_prep[iii].scratch[CDP_val].
1168 u_val = pdp_temp[ii];
1169 break;
1170 case CF_LAST:
1171 default:
1172 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1173 pdp_temp[ii];
1174 break;
1175 }
1176 }
1177 }
1178 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1179 if (elapsed_pdp_st > 2) {
1180 switch (current_cf) {
1181 case CF_AVERAGE:
1182 default:
1183 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1184 u_val = pdp_temp[ii];
1185 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1186 u_val = pdp_temp[ii];
1187 break;
1188 case CF_SEASONAL:
1189 case CF_DEVSEASONAL:
1190 /* need to update cached seasonal values, so they are consistent
1191 * with the bulk update */
1192 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1193 * CDP_last_deviation are the same. */
1194 rrd.cdp_prep[iii].
1195 scratch[CDP_hw_last_seasonal].u_val =
1196 last_seasonal_coef[ii];
1197 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1198 u_val = seasonal_coef[ii];
1199 break;
1200 case CF_HWPREDICT:
1201 case CF_MHWPREDICT:
1202 /* need to update the null_count and last_null_count.
1203 * even do this for non-DNAN pdp_temp because the
1204 * algorithm is not learning from batch updates. */
1205 rrd.cdp_prep[iii].scratch[CDP_null_count].
1206 u_cnt += elapsed_pdp_st;
1207 rrd.cdp_prep[iii].
1208 scratch[CDP_last_null_count].u_cnt +=
1209 elapsed_pdp_st - 1;
1210 /* fall through */
1211 case CF_DEVPREDICT:
1212 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1213 u_val = DNAN;
1214 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1215 u_val = DNAN;
1216 break;
1217 case CF_FAILURES:
1218 /* do not count missed bulk values as failures */
1219 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1220 u_val = 0;
1221 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1222 u_val = 0;
1223 /* need to reset violations buffer.
1224 * could do this more carefully, but for now, just
1225 * assume a bulk update wipes away all violations. */
1226 erase_violations(&rrd, iii, i);
1227 break;
1228 }
1229 }
1230 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1232 if (rrd_test_error())
1233 break;
1235 } /* endif data sources loop */
1236 } /* end RRA Loop */
1238 /* this loop is only entered if elapsed_pdp_st < 3 */
1239 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1240 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1241 for (i = 0, rra_start = rra_begin;
1242 i < rrd.stat_head->rra_cnt;
1243 rra_start +=
1244 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1245 sizeof(rrd_value_t), i++) {
1246 if (rrd.rra_def[i].pdp_cnt > 1)
1247 continue;
1249 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1250 if (current_cf == CF_SEASONAL
1251 || current_cf == CF_DEVSEASONAL) {
1252 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1253 elapsed_pdp_st + (scratch_idx ==
1254 CDP_primary_val ? 1
1255 : 2),
1256 &seasonal_coef);
1257 rra_current = rrd_tell(rrd_file);
1258 }
1259 if (rrd_test_error())
1260 break;
1261 /* loop over data soures within each RRA */
1262 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1263 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1264 i * (rrd.stat_head->ds_cnt) + ii,
1265 i, ii, scratch_idx, seasonal_coef);
1266 }
1267 } /* end RRA Loop */
1268 if (rrd_test_error())
1269 break;
1270 } /* end elapsed_pdp_st loop */
1272 if (rrd_test_error())
1273 break;
1275 /* Ready to write to disk */
1276 /* Move sequentially through the file, writing one RRA at a time.
1277 * Note this architecture divorces the computation of CDP with
1278 * flushing updated RRA entries to disk. */
1279 for (i = 0, rra_start = rra_begin;
1280 i < rrd.stat_head->rra_cnt;
1281 rra_start +=
1282 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1283 sizeof(rrd_value_t), i++) {
1284 /* is th5Aere anything to write for this RRA? If not, continue. */
1285 if (rra_step_cnt[i] == 0)
1286 continue;
1288 /* write the first row */
1289 #ifdef DEBUG
1290 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1291 #endif
1292 rrd.rra_ptr[i].cur_row++;
1293 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1294 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1295 /* positition on the first row */
1296 rra_pos_tmp = rra_start +
1297 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1298 sizeof(rrd_value_t);
1299 if (rra_pos_tmp != rra_current) {
1300 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1301 rrd_set_error("seek error in rrd");
1302 break;
1303 }
1304 rra_current = rra_pos_tmp;
1305 }
1306 #ifdef DEBUG
1307 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1308 #endif
1309 scratch_idx = CDP_primary_val;
1310 if (pcdp_summary != NULL) {
1311 rra_time = (current_time - current_time
1312 % (rrd.rra_def[i].pdp_cnt *
1313 rrd.stat_head->pdp_step))
1314 -
1315 ((rra_step_cnt[i] -
1316 1) * rrd.rra_def[i].pdp_cnt *
1317 rrd.stat_head->pdp_step);
1318 }
1319 pcdp_summary =
1320 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1321 scratch_idx, pcdp_summary, &rra_time);
1322 if (rrd_test_error())
1323 break;
1325 /* write other rows of the bulk update, if any */
1326 scratch_idx = CDP_secondary_val;
1327 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1328 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1329 #ifdef DEBUG
1330 fprintf(stderr,
1331 "Wraparound for RRA %s, %lu updates left\n",
1332 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1333 #endif
1334 /* wrap */
1335 rrd.rra_ptr[i].cur_row = 0;
1336 /* seek back to beginning of current rra */
1337 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1338 rrd_set_error("seek error in rrd");
1339 break;
1340 }
1341 #ifdef DEBUG
1342 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1343 rrd_file->pos);
1344 #endif
1345 rra_current = rra_start;
1346 }
1347 if (pcdp_summary != NULL) {
1348 rra_time = (current_time - current_time
1349 % (rrd.rra_def[i].pdp_cnt *
1350 rrd.stat_head->pdp_step))
1351 -
1352 ((rra_step_cnt[i] -
1353 2) * rrd.rra_def[i].pdp_cnt *
1354 rrd.stat_head->pdp_step);
1355 }
1356 pcdp_summary =
1357 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1358 scratch_idx, pcdp_summary, &rra_time);
1359 }
1361 if (rrd_test_error())
1362 break;
1363 } /* RRA LOOP */
1365 /* break out of the argument parsing loop if error_string is set */
1366 if (rrd_test_error()) {
1367 free(step_start);
1368 break;
1369 }
1371 } /* endif a pdp_st has occurred */
1372 rrd.live_head->last_up = current_time;
1373 rrd.live_head->last_up_usec = current_time_usec;
1374 free(step_start);
1375 } /* function argument loop */
1377 if (seasonal_coef != NULL)
1378 free(seasonal_coef);
1379 if (last_seasonal_coef != NULL)
1380 free(last_seasonal_coef);
1381 if (rra_step_cnt != NULL)
1382 free(rra_step_cnt);
1383 rpnstack_free(&rpnstack);
1385 #if 0
1386 //rrd_flush(rrd_file); //XXX: really needed?
1387 #endif
1388 /* if we got here and if there is an error and if the file has not been
1389 * written to, then close things up and return. */
1390 if (rrd_test_error()) {
1391 goto err_free_pdp_new;
1392 }
1394 /* aargh ... that was tough ... so many loops ... anyway, its done.
1395 * we just need to write back the live header portion now*/
1397 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1398 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1399 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1400 SEEK_SET) != 0) {
1401 rrd_set_error("seek rrd for live header writeback");
1402 goto err_free_pdp_new;
1403 }
1404 /* for mmap, we did already write to the underlying mapping, so we do
1405 not need to write again. */
1406 #ifndef HAVE_MMAP
1407 if (version >= 3) {
1408 if (rrd_write(rrd_file, rrd.live_head,
1409 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1410 rrd_set_error("rrd_write live_head to rrd");
1411 goto err_free_pdp_new;
1412 }
1413 } else {
1414 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1415 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1416 rrd_set_error("rrd_write live_head to rrd");
1417 goto err_free_pdp_new;
1418 }
1419 }
1422 if (rrd_write(rrd_file, rrd.pdp_prep,
1423 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1424 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1425 rrd_set_error("rrd_write pdp_prep to rrd");
1426 goto err_free_pdp_new;
1427 }
1429 if (rrd_write(rrd_file, rrd.cdp_prep,
1430 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1431 rrd.stat_head->ds_cnt)
1432 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1433 rrd.stat_head->ds_cnt)) {
1435 rrd_set_error("rrd_write cdp_prep to rrd");
1436 goto err_free_pdp_new;
1437 }
1439 if (rrd_write(rrd_file, rrd.rra_ptr,
1440 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1441 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1442 rrd_set_error("rrd_write rra_ptr to rrd");
1443 goto err_free_pdp_new;
1444 }
1445 #endif
1447 /* rrd_flush(rrd_file); */
1449 /* calling the smoothing code here guarantees at most
1450 * one smoothing operation per rrd_update call. Unfortunately,
1451 * it is possible with bulk updates, or a long-delayed update
1452 * for smoothing to occur off-schedule. This really isn't
1453 * critical except during the burning cycles. */
1454 if (schedule_smooth) {
1456 rra_start = rra_begin;
1457 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1458 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1459 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1460 #ifdef DEBUG
1461 fprintf(stderr, "Running smoother for rra %ld\n", i);
1462 #endif
1463 apply_smoother(&rrd, i, rra_start, rrd_file);
1464 if (rrd_test_error())
1465 break;
1466 }
1467 rra_start += rrd.rra_def[i].row_cnt
1468 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1469 }
1470 }
1472 rrd_dontneed(rrd_file,&rrd);
1473 rrd_free(&rrd);
1474 rrd_close(rrd_file);
1476 free(pdp_new);
1477 free(tmpl_idx);
1478 free(pdp_temp);
1479 free(updvals);
1480 return (0);
1482 err_free_pdp_new:
1483 free(pdp_new);
1484 err_free_tmpl_idx:
1485 free(tmpl_idx);
1486 err_free_pdp_temp:
1487 free(pdp_temp);
1488 err_free_updvals:
1489 free(updvals);
1490 err_close:
1491 rrd_close(rrd_file);
1492 err_free:
1493 rrd_free(&rrd);
1494 err_out:
1495 return (-1);
1496 }
1498 /*
1499 * get exclusive lock to whole file.
1500 * lock gets removed when we close the file
1501 *
1502 * returns 0 on success
1503 */
1504 int LockRRD(
1505 int in_file)
1506 {
1507 int rcstat;
1509 {
1510 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1511 struct _stat st;
1513 if (_fstat(in_file, &st) == 0) {
1514 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1515 } else {
1516 rcstat = -1;
1517 }
1518 #else
1519 struct flock lock;
1521 lock.l_type = F_WRLCK; /* exclusive write lock */
1522 lock.l_len = 0; /* whole file */
1523 lock.l_start = 0; /* start of file */
1524 lock.l_whence = SEEK_SET; /* end of file */
1526 rcstat = fcntl(in_file, F_SETLK, &lock);
1527 #endif
1528 }
1530 return (rcstat);
1531 }