1 /*****************************************************************************
2 * RRDtool 1.2.99907080300 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
36 };
37 #endif
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static int gettimeofday(
45 struct timeval *t,
46 struct __timezone *tz)
47 {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normalize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static inline void normalize_time(
65 struct timeval *t)
66 {
67 if (t->tv_usec < 0) {
68 t->tv_sec--;
69 t->tv_usec += 1000000L;
70 }
71 }
73 static inline info_t *write_RRA_row(
74 rrd_file_t *rrd_file,
75 rrd_t *rrd,
76 unsigned long rra_idx,
77 unsigned long *rra_current,
78 unsigned short CDP_scratch_idx,
79 info_t *pcdp_summary,
80 time_t *rra_time)
81 {
82 unsigned long ds_idx, cdp_idx;
83 infoval iv;
85 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86 /* compute the cdp index */
87 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
88 #ifdef DEBUG
89 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
92 #endif
93 if (pcdp_summary != NULL) {
94 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95 /* append info to the return hash */
96 pcdp_summary = info_push(pcdp_summary,
97 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
98 *rra_time,
99 rrd->rra_def[rra_idx].
100 cf_nam,
101 rrd->rra_def[rra_idx].
102 pdp_cnt,
103 rrd->ds_def[ds_idx].
104 ds_nam), RD_I_VAL, iv);
105 }
106 if (rrd_write
107 (rrd_file,
108 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111 return 0;
112 }
113 *rra_current += sizeof(rrd_value_t);
114 }
115 return (pcdp_summary);
116 }
118 int rrd_update_r(
119 const char *filename,
120 const char *tmplt,
121 int argc,
122 const char **argv);
123 int _rrd_update(
124 const char *filename,
125 const char *tmplt,
126 int argc,
127 const char **argv,
128 info_t *);
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
133 info_t *rrd_update_v(
134 int argc,
135 char **argv)
136 {
137 char *tmplt = NULL;
138 info_t *result = NULL;
139 infoval rc;
140 struct option long_options[] = {
141 {"template", required_argument, 0, 't'},
142 {0, 0, 0, 0}
143 };
145 rc.u_int = -1;
146 optind = 0;
147 opterr = 0; /* initialize getopt */
149 while (1) {
150 int option_index = 0;
151 int opt;
153 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
155 if (opt == EOF)
156 break;
158 switch (opt) {
159 case 't':
160 tmplt = optarg;
161 break;
163 case '?':
164 rrd_set_error("unknown option '%s'", argv[optind - 1]);
165 goto end_tag;
166 }
167 }
169 /* need at least 2 arguments: filename, data. */
170 if (argc - optind < 2) {
171 rrd_set_error("Not enough arguments");
172 goto end_tag;
173 }
174 rc.u_int = 0;
175 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176 rc.u_int = _rrd_update(argv[optind], tmplt,
177 argc - optind - 1,
178 (const char **) (argv + optind + 1), result);
179 result->value.u_int = rc.u_int;
180 end_tag:
181 return result;
182 }
184 int rrd_update(
185 int argc,
186 char **argv)
187 {
188 struct option long_options[] = {
189 {"template", required_argument, 0, 't'},
190 {0, 0, 0, 0}
191 };
192 int option_index = 0;
193 int opt;
194 char *tmplt = NULL;
195 int rc;
197 optind = 0;
198 opterr = 0; /* initialize getopt */
200 while (1) {
201 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
203 if (opt == EOF)
204 break;
206 switch (opt) {
207 case 't':
208 tmplt = strdup(optarg);
209 break;
211 case '?':
212 rrd_set_error("unknown option '%s'", argv[optind - 1]);
213 return (-1);
214 }
215 }
217 /* need at least 2 arguments: filename, data. */
218 if (argc - optind < 2) {
219 rrd_set_error("Not enough arguments");
221 return -1;
222 }
224 rc = rrd_update_r(argv[optind], tmplt,
225 argc - optind - 1, (const char **) (argv + optind + 1));
226 free(tmplt);
227 return rc;
228 }
230 int rrd_update_r(
231 const char *filename,
232 const char *tmplt,
233 int argc,
234 const char **argv)
235 {
236 return _rrd_update(filename, tmplt, argc, argv, NULL);
237 }
239 int _rrd_update(
240 const char *filename,
241 const char *tmplt,
242 int argc,
243 const char **argv,
244 info_t *pcdp_summary)
245 {
247 int arg_i = 2;
248 short j;
249 unsigned long i, ii, iii = 1;
251 unsigned long rra_begin; /* byte pointer to the rra
252 * area in the rrd file. this
253 * pointer never changes value */
254 unsigned long rra_start; /* byte pointer to the rra
255 * area in the rrd file. this
256 * pointer changes as each rrd is
257 * processed. */
258 unsigned long rra_current; /* byte pointer to the current write
259 * spot in the rrd file. */
260 unsigned long rra_pos_tmp; /* temporary byte pointer. */
261 double interval, pre_int, post_int; /* interval between this and
262 * the last run */
263 unsigned long proc_pdp_st; /* which pdp_st was the last
264 * to be processed */
265 unsigned long occu_pdp_st; /* when was the pdp_st
266 * before the last update
267 * time */
268 unsigned long proc_pdp_age; /* how old was the data in
269 * the pdp prep area when it
270 * was last updated */
271 unsigned long occu_pdp_age; /* how long ago was the last
272 * pdp_step time */
273 rrd_value_t *pdp_new; /* prepare the incoming data
274 * to be added the the
275 * existing entry */
276 rrd_value_t *pdp_temp; /* prepare the pdp values
277 * to be added the the
278 * cdp values */
280 long *tmpl_idx; /* index representing the settings
281 transported by the tmplt index */
282 unsigned long tmpl_cnt = 2; /* time and data */
284 rrd_t rrd;
285 time_t current_time = 0;
286 time_t rra_time = 0; /* time of update for a RRA */
287 unsigned long current_time_usec = 0; /* microseconds part of current time */
288 struct timeval tmp_time; /* used for time conversion */
290 char **updvals;
291 int schedule_smooth = 0;
292 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
294 /* a vector of future Holt-Winters seasonal coefs */
295 unsigned long elapsed_pdp_st;
297 /* number of elapsed PDP steps since last update */
298 unsigned long *rra_step_cnt = NULL;
300 /* number of rows to be updated in an RRA for a data
301 * value. */
302 unsigned long start_pdp_offset;
304 /* number of PDP steps since the last update that
305 * are assigned to the first CDP to be generated
306 * since the last update. */
307 unsigned short scratch_idx;
309 /* index into the CDP scratch array */
310 enum cf_en current_cf;
312 /* numeric id of the current consolidation function */
313 rpnstack_t rpnstack; /* used for COMPUTE DS */
314 int version; /* rrd version */
315 char *endptr; /* used in the conversion */
316 rrd_file_t *rrd_file;
318 rpnstack_init(&rpnstack);
320 /* need at least 1 arguments: data. */
321 if (argc < 1) {
322 rrd_set_error("Not enough arguments");
323 goto err_out;
324 }
326 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327 if (rrd_file == NULL) {
328 goto err_free;
329 }
330 /* We are now at the beginning of the rra's */
331 rra_current = rra_start = rra_begin = rrd_file->header_len;
333 /* initialize time */
334 version = atoi(rrd.stat_head->version);
335 gettimeofday(&tmp_time, 0);
336 normalize_time(&tmp_time);
337 current_time = tmp_time.tv_sec;
338 if (version >= 3) {
339 current_time_usec = tmp_time.tv_usec;
340 } else {
341 current_time_usec = 0;
342 }
344 /* get exclusive lock to whole file.
345 * lock gets removed when we close the file.
346 */
347 if (LockRRD(rrd_file->fd) != 0) {
348 rrd_set_error("could not lock RRD");
349 goto err_close;
350 }
352 if ((updvals =
353 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354 rrd_set_error("allocating updvals pointer array");
355 goto err_close;
356 }
358 if ((pdp_temp = malloc(sizeof(rrd_value_t)
359 * rrd.stat_head->ds_cnt)) == NULL) {
360 rrd_set_error("allocating pdp_temp ...");
361 goto err_free_updvals;
362 }
364 if ((tmpl_idx = malloc(sizeof(unsigned long)
365 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366 rrd_set_error("allocating tmpl_idx ...");
367 goto err_free_pdp_temp;
368 }
369 /* initialize tmplt redirector */
370 /* default config example (assume DS 1 is a CDEF DS)
371 tmpl_idx[0] -> 0; (time)
372 tmpl_idx[1] -> 1; (DS 0)
373 tmpl_idx[2] -> 3; (DS 2)
374 tmpl_idx[3] -> 4; (DS 3) */
375 tmpl_idx[0] = 0; /* time */
376 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
378 tmpl_idx[ii++] = i;
379 }
380 tmpl_cnt = ii;
382 if (tmplt) {
383 /* we should work on a writeable copy here */
384 char *dsname;
385 unsigned int tmpl_len;
386 char *tmplt_copy = strdup(tmplt);
388 dsname = tmplt_copy;
389 tmpl_cnt = 1; /* the first entry is the time */
390 tmpl_len = strlen(tmplt_copy);
391 for (i = 0; i <= tmpl_len; i++) {
392 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393 tmplt_copy[i] = '\0';
394 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
395 rrd_set_error
396 ("tmplt contains more DS definitions than RRD");
397 goto err_free_tmpl_idx;
398 }
399 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400 rrd_set_error("unknown DS name '%s'", dsname);
401 goto err_free_tmpl_idx;
402 } else {
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt - 1]++;
405 /* go to the next entry on the tmplt_copy */
406 dsname = &tmplt_copy[i + 1];
407 /* fix the damage we did before */
408 if (i < tmpl_len) {
409 tmplt_copy[i] = ':';
410 }
412 }
413 }
414 }
415 free(tmplt_copy);
416 }
417 if ((pdp_new = malloc(sizeof(rrd_value_t)
418 * rrd.stat_head->ds_cnt)) == NULL) {
419 rrd_set_error("allocating pdp_new ...");
420 goto err_free_tmpl_idx;
421 }
422 /* loop through the arguments. */
423 for (arg_i = 0; arg_i < argc; arg_i++) {
424 char *stepper = strdup(argv[arg_i]);
425 char *step_start = stepper;
426 char *p;
427 char *parsetime_error = NULL;
428 enum { atstyle, normal } timesyntax;
429 struct rrd_time_value ds_tv;
431 if (stepper == NULL) {
432 rrd_set_error("failed duplication argv entry");
433 free(step_start);
434 goto err_free_pdp_new;
435 }
436 /* initialize all ds input to unknown except the first one
437 which has always got to be set */
438 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
439 updvals[ii] = "U";
440 updvals[0] = stepper;
441 /* separate all ds elements; first must be examined separately
442 due to alternate time syntax */
443 if ((p = strchr(stepper, '@')) != NULL) {
444 timesyntax = atstyle;
445 *p = '\0';
446 stepper = p + 1;
447 } else if ((p = strchr(stepper, ':')) != NULL) {
448 timesyntax = normal;
449 *p = '\0';
450 stepper = p + 1;
451 } else {
452 rrd_set_error
453 ("expected timestamp not found in data source from %s",
454 argv[arg_i]);
455 free(step_start);
456 break;
457 }
458 ii = 1;
459 updvals[tmpl_idx[ii]] = stepper;
460 while (*stepper) {
461 if (*stepper == ':') {
462 *stepper = '\0';
463 ii++;
464 if (ii < tmpl_cnt) {
465 updvals[tmpl_idx[ii]] = stepper + 1;
466 }
467 }
468 stepper++;
469 }
471 if (ii != tmpl_cnt - 1) {
472 rrd_set_error
473 ("expected %lu data source readings (got %lu) from %s",
474 tmpl_cnt - 1, ii, argv[arg_i]);
475 free(step_start);
476 break;
477 }
479 /* get the time from the reading ... handle N */
480 if (timesyntax == atstyle) {
481 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
483 free(step_start);
484 break;
485 }
486 if (ds_tv.type == RELATIVE_TO_END_TIME ||
487 ds_tv.type == RELATIVE_TO_START_TIME) {
488 rrd_set_error("specifying time relative to the 'start' "
489 "or 'end' makes no sense here: %s", updvals[0]);
490 free(step_start);
491 break;
492 }
494 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
496 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
498 } else if (strcmp(updvals[0], "N") == 0) {
499 gettimeofday(&tmp_time, 0);
500 normalize_time(&tmp_time);
501 current_time = tmp_time.tv_sec;
502 current_time_usec = tmp_time.tv_usec;
503 } else {
504 double tmp;
505 char *old_locale;
507 old_locale = setlocale(LC_NUMERIC, "C");
508 tmp = strtod(updvals[0], 0);
509 setlocale(LC_NUMERIC, old_locale);
510 current_time = floor(tmp);
511 current_time_usec =
512 (long) ((tmp - (double) current_time) * 1000000.0);
513 }
514 /* dont do any correction for old version RRDs */
515 if (version < 3)
516 current_time_usec = 0;
518 if (current_time < rrd.live_head->last_up ||
519 (current_time == rrd.live_head->last_up &&
520 (long) current_time_usec <=
521 (long) rrd.live_head->last_up_usec)) {
522 rrd_set_error("illegal attempt to update using time %ld when "
523 "last update time is %ld (minimum one second step)",
524 current_time, rrd.live_head->last_up);
525 free(step_start);
526 break;
527 }
529 /* seek to the beginning of the rra's */
530 if (rra_current != rra_begin) {
531 #ifndef HAVE_MMAP
532 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
533 rrd_set_error("seek error in rrd");
534 free(step_start);
535 break;
536 }
537 #endif
538 rra_current = rra_begin;
539 }
540 rra_start = rra_begin;
542 /* when was the current pdp started */
543 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
544 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
546 /* when did the last pdp_st occur */
547 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
548 occu_pdp_st = current_time - occu_pdp_age;
550 /* interval = current_time - rrd.live_head->last_up; */
551 interval = (double) (current_time - rrd.live_head->last_up)
552 + (double) ((long) current_time_usec -
553 (long) rrd.live_head->last_up_usec) / 1000000.0;
555 if (occu_pdp_st > proc_pdp_st) {
556 /* OK we passed the pdp_st moment */
557 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
558 * occurred before the latest
559 * pdp_st moment*/
560 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
561 post_int = occu_pdp_age; /* how much after it */
562 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
563 } else {
564 pre_int = interval;
565 post_int = 0;
566 }
568 #ifdef DEBUG
569 printf("proc_pdp_age %lu\t"
570 "proc_pdp_st %lu\t"
571 "occu_pfp_age %lu\t"
572 "occu_pdp_st %lu\t"
573 "int %lf\t"
574 "pre_int %lf\t"
575 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
576 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
577 #endif
579 /* process the data sources and update the pdp_prep
580 * area accordingly */
581 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
582 enum dst_en dst_idx;
584 dst_idx = dst_conv(rrd.ds_def[i].dst);
586 /* make sure we do not build diffs with old last_ds values */
587 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
588 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
589 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
590 }
592 /* NOTE: DST_CDEF should never enter this if block, because
593 * updvals[i+1][0] is initialized to 'U'; unless the caller
594 * accidently specified a value for the DST_CDEF. To handle
595 * this case, an extra check is required. */
597 if ((updvals[i + 1][0] != 'U') &&
598 (dst_idx != DST_CDEF) &&
599 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
600 double rate = DNAN;
601 char *old_locale;
603 /* the data source type defines how to process the data */
604 /* pdp_new contains rate * time ... eg the bytes
605 * transferred during the interval. Doing it this way saves
606 * a lot of math operations */
607 switch (dst_idx) {
608 case DST_COUNTER:
609 case DST_DERIVE:
610 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611 if ((updvals[i + 1][ii] < '0'
612 || updvals[i + 1][ii] > '9') && (ii != 0
613 && updvals[i
614 + 1]
615 [ii] != '-')) {
616 rrd_set_error("not a simple integer: '%s'",
617 updvals[i + 1]);
618 break;
619 }
620 }
621 if (rrd_test_error()) {
622 break;
623 }
624 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
625 pdp_new[i] =
626 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
627 if (dst_idx == DST_COUNTER) {
628 /* simple overflow catcher suggested by Andres Kroonmaa */
629 /* this will fail terribly for non 32 or 64 bit counters ... */
630 /* are there any others in SNMP land ? */
631 if (pdp_new[i] < (double) 0.0)
632 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
633 if (pdp_new[i] < (double) 0.0)
634 pdp_new[i] += (double) 18446744069414584320.0;
635 /* 2^64-2^32 */ ;
636 }
637 rate = pdp_new[i] / interval;
638 } else {
639 pdp_new[i] = DNAN;
640 }
641 break;
642 case DST_ABSOLUTE:
643 old_locale = setlocale(LC_NUMERIC, "C");
644 errno = 0;
645 pdp_new[i] = strtod(updvals[i + 1], &endptr);
646 setlocale(LC_NUMERIC, old_locale);
647 if (errno > 0) {
648 rrd_set_error("converting '%s' to float: %s",
649 updvals[i + 1], rrd_strerror(errno));
650 break;
651 };
652 if (endptr[0] != '\0') {
653 rrd_set_error
654 ("conversion of '%s' to float not complete: tail '%s'",
655 updvals[i + 1], endptr);
656 break;
657 }
658 rate = pdp_new[i] / interval;
659 break;
660 case DST_GAUGE:
661 errno = 0;
662 old_locale = setlocale(LC_NUMERIC, "C");
663 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
664 setlocale(LC_NUMERIC, old_locale);
665 if (errno > 0) {
666 rrd_set_error("converting '%s' to float: %s",
667 updvals[i + 1], rrd_strerror(errno));
668 break;
669 };
670 if (endptr[0] != '\0') {
671 rrd_set_error
672 ("conversion of '%s' to float not complete: tail '%s'",
673 updvals[i + 1], endptr);
674 break;
675 }
676 rate = pdp_new[i] / interval;
677 break;
678 default:
679 rrd_set_error("rrd contains unknown DS type : '%s'",
680 rrd.ds_def[i].dst);
681 break;
682 }
683 /* break out of this for loop if the error string is set */
684 if (rrd_test_error()) {
685 break;
686 }
687 /* make sure pdp_temp is neither too large or too small
688 * if any of these occur it becomes unknown ...
689 * sorry folks ... */
690 if (!isnan(rate) &&
691 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
692 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
693 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
694 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
695 pdp_new[i] = DNAN;
696 }
697 } else {
698 /* no news is news all the same */
699 pdp_new[i] = DNAN;
700 }
703 /* make a copy of the command line argument for the next run */
704 #ifdef DEBUG
705 fprintf(stderr,
706 "prep ds[%lu]\t"
707 "last_arg '%s'\t"
708 "this_arg '%s'\t"
709 "pdp_new %10.2f\n",
710 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
711 #endif
712 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
713 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
714 }
715 /* break out of the argument parsing loop if the error_string is set */
716 if (rrd_test_error()) {
717 free(step_start);
718 break;
719 }
720 /* has a pdp_st moment occurred since the last run ? */
722 if (proc_pdp_st == occu_pdp_st) {
723 /* no we have not passed a pdp_st moment. therefore update is simple */
725 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
726 if (isnan(pdp_new[i])) {
727 /* this is not realy accurate if we use subsecond data arival time
728 should have thought of it when going subsecond resolution ...
729 sorry next format change we will have it! */
730 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
731 floor(interval);
732 } else {
733 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
734 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
735 } else {
736 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
737 }
738 }
739 #ifdef DEBUG
740 fprintf(stderr,
741 "NO PDP ds[%lu]\t"
742 "value %10.2f\t"
743 "unkn_sec %5lu\n",
744 i,
745 rrd.pdp_prep[i].scratch[PDP_val].u_val,
746 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748 }
749 } else {
750 /* an pdp_st has occurred. */
752 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
753 rate*seconds which occurred up to the last run.
754 pdp_new[] contains rate*seconds from the latest run.
755 pdp_temp[] will contain the rate for cdp */
757 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
758 /* update pdp_prep to the current pdp_st. */
759 double pre_unknown = 0.0;
761 if (isnan(pdp_new[i])) {
762 /* a final bit of unkonwn to be added bevore calculation
763 we use a temporary variable for this so that we
764 don't have to turn integer lines before using the value */
765 pre_unknown = pre_int;
766 } else {
767 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
768 rrd.pdp_prep[i].scratch[PDP_val].u_val =
769 pdp_new[i] / interval * pre_int;
770 } else {
771 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
772 pdp_new[i] / interval * pre_int;
773 }
774 }
777 /* if too much of the pdp_prep is unknown we dump it */
778 if (
779 /* removed because this does not agree with the
780 definition that a heartbeat can be unknown */
781 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
782 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
783 /* if the interval is larger thatn mrhb we get NAN */
784 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
785 (occu_pdp_st - proc_pdp_st <=
786 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
787 pdp_temp[i] = DNAN;
788 } else {
789 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
790 / ((double) (occu_pdp_st - proc_pdp_st
791 -
792 rrd.pdp_prep[i].
793 scratch[PDP_unkn_sec_cnt].u_cnt)
794 - pre_unknown);
795 }
797 /* process CDEF data sources; remember each CDEF DS can
798 * only reference other DS with a lower index number */
799 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
800 rpnp_t *rpnp;
802 rpnp =
803 rpn_expand((rpn_cdefds_t *) &
804 (rrd.ds_def[i].par[DS_cdef]));
805 /* substitue data values for OP_VARIABLE nodes */
806 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
807 if (rpnp[ii].op == OP_VARIABLE) {
808 rpnp[ii].op = OP_NUMBER;
809 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
810 }
811 }
812 /* run the rpn calculator */
813 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
814 free(rpnp);
815 break; /* exits the data sources pdp_temp loop */
816 }
817 }
819 /* make pdp_prep ready for the next run */
820 if (isnan(pdp_new[i])) {
821 /* this is not realy accurate if we use subsecond data arival time
822 should have thought of it when going subsecond resolution ...
823 sorry next format change we will have it! */
824 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
825 floor(post_int);
826 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
827 } else {
828 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
829 rrd.pdp_prep[i].scratch[PDP_val].u_val =
830 pdp_new[i] / interval * post_int;
831 }
833 #ifdef DEBUG
834 fprintf(stderr,
835 "PDP UPD ds[%lu]\t"
836 "pdp_temp %10.2f\t"
837 "new_prep %10.2f\t"
838 "new_unkn_sec %5lu\n",
839 i, pdp_temp[i],
840 rrd.pdp_prep[i].scratch[PDP_val].u_val,
841 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
842 #endif
843 }
845 /* if there were errors during the last loop, bail out here */
846 if (rrd_test_error()) {
847 free(step_start);
848 break;
849 }
851 /* compute the number of elapsed pdp_st moments */
852 elapsed_pdp_st =
853 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
854 #ifdef DEBUG
855 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
856 #endif
857 if (rra_step_cnt == NULL) {
858 rra_step_cnt = (unsigned long *)
859 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
860 }
862 for (i = 0, rra_start = rra_begin;
863 i < rrd.stat_head->rra_cnt;
864 rra_start +=
865 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
866 sizeof(rrd_value_t), i++) {
867 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
868 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
869 (proc_pdp_st / rrd.stat_head->pdp_step) %
870 rrd.rra_def[i].pdp_cnt;
871 if (start_pdp_offset <= elapsed_pdp_st) {
872 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
873 rrd.rra_def[i].pdp_cnt + 1;
874 } else {
875 rra_step_cnt[i] = 0;
876 }
878 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
879 /* If this is a bulk update, we need to skip ahead in
880 the seasonal arrays so that they will be correct for
881 the next observed value;
882 note that for the bulk update itself, no update will
883 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
884 and DEVPREDICT will be set to DNAN. */
885 if (rra_step_cnt[i] > 2) {
886 /* skip update by resetting rra_step_cnt[i],
887 note that this is not data source specific; this is
888 due to the bulk update, not a DNAN value for the
889 specific data source. */
890 rra_step_cnt[i] = 0;
891 lookup_seasonal(&rrd, i, rra_start, rrd_file,
892 elapsed_pdp_st, &last_seasonal_coef);
893 lookup_seasonal(&rrd, i, rra_start, rrd_file,
894 elapsed_pdp_st + 1, &seasonal_coef);
895 }
897 /* periodically run a smoother for seasonal effects */
898 /* Need to use first cdp parameter buffer to track
899 * burnin (burnin requires a specific smoothing schedule).
900 * The CDP_init_seasonal parameter is really an RRA level,
901 * not a data source within RRA level parameter, but the rra_def
902 * is read only for rrd_update (not flushed to disk). */
903 iii = i * (rrd.stat_head->ds_cnt);
904 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
905 <= BURNIN_CYCLES) {
906 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
907 > rrd.rra_def[i].row_cnt - 1) {
908 /* mark off one of the burnin cycles */
909 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
910 u_cnt);
911 schedule_smooth = 1;
912 }
913 } else {
914 /* someone has no doubt invented a trick to deal with this
915 * wrap around, but at least this code is clear. */
916 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
917 u_cnt > rrd.rra_ptr[i].cur_row) {
918 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
919 * mapping between PDP and CDP */
920 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
921 >=
922 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
923 u_cnt) {
924 #ifdef DEBUG
925 fprintf(stderr,
926 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
927 rrd.rra_ptr[i].cur_row,
928 elapsed_pdp_st,
929 rrd.rra_def[i].
930 par[RRA_seasonal_smooth_idx].u_cnt);
931 #endif
932 schedule_smooth = 1;
933 }
934 } else {
935 /* can't rely on negative numbers because we are working with
936 * unsigned values */
937 /* Don't need modulus here. If we've wrapped more than once, only
938 * one smooth is executed at the end. */
939 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
940 rrd.rra_def[i].row_cnt
941 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
942 rrd.rra_def[i].row_cnt >=
943 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
944 u_cnt) {
945 #ifdef DEBUG
946 fprintf(stderr,
947 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
948 rrd.rra_ptr[i].cur_row,
949 elapsed_pdp_st,
950 rrd.rra_def[i].
951 par[RRA_seasonal_smooth_idx].u_cnt);
952 #endif
953 schedule_smooth = 1;
954 }
955 }
956 }
958 rra_current = rrd_tell(rrd_file);
959 }
960 /* if cf is DEVSEASONAL or SEASONAL */
961 if (rrd_test_error())
962 break;
964 /* update CDP_PREP areas */
965 /* loop over data soures within each RRA */
966 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
968 /* iii indexes the CDP prep area for this data source within the RRA */
969 iii = i * rrd.stat_head->ds_cnt + ii;
971 if (rrd.rra_def[i].pdp_cnt > 1) {
973 if (rra_step_cnt[i] > 0) {
974 /* If we are in this block, as least 1 CDP value will be written to
975 * disk, this is the CDP_primary_val entry. If more than 1 value needs
976 * to be written, then the "fill in" value is the CDP_secondary_val
977 * entry. */
978 if (isnan(pdp_temp[ii])) {
979 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
980 u_cnt += start_pdp_offset;
981 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
982 u_val = DNAN;
983 } else {
984 /* CDP_secondary value is the RRA "fill in" value for intermediary
985 * CDP data entries. No matter the CF, the value is the same because
986 * the average, max, min, and last of a list of identical values is
987 * the same, namely, the value itself. */
988 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
989 u_val = pdp_temp[ii];
990 }
992 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
993 u_cnt >
994 rrd.rra_def[i].pdp_cnt *
995 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
996 rrd.cdp_prep[iii].scratch[CDP_primary_val].
997 u_val = DNAN;
998 /* initialize carry over */
999 if (current_cf == CF_AVERAGE) {
1000 if (isnan(pdp_temp[ii])) {
1001 rrd.cdp_prep[iii].scratch[CDP_val].
1002 u_val = DNAN;
1003 } else {
1004 rrd.cdp_prep[iii].scratch[CDP_val].
1005 u_val =
1006 pdp_temp[ii] *
1007 ((elapsed_pdp_st -
1008 start_pdp_offset) %
1009 rrd.rra_def[i].pdp_cnt);
1010 }
1011 } else {
1012 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1013 pdp_temp[ii];
1014 }
1015 } else {
1016 rrd_value_t cum_val, cur_val;
1018 switch (current_cf) {
1019 case CF_AVERAGE:
1020 cum_val =
1021 IFDNAN(rrd.cdp_prep[iii].
1022 scratch[CDP_val].u_val, 0.0);
1023 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1024 rrd.cdp_prep[iii].
1025 scratch[CDP_primary_val].u_val =
1026 (cum_val +
1027 cur_val * start_pdp_offset) /
1028 (rrd.rra_def[i].pdp_cnt -
1029 rrd.cdp_prep[iii].
1030 scratch[CDP_unkn_pdp_cnt].u_cnt);
1031 /* initialize carry over value */
1032 if (isnan(pdp_temp[ii])) {
1033 rrd.cdp_prep[iii].scratch[CDP_val].
1034 u_val = DNAN;
1035 } else {
1036 rrd.cdp_prep[iii].scratch[CDP_val].
1037 u_val =
1038 pdp_temp[ii] *
1039 ((elapsed_pdp_st -
1040 start_pdp_offset) %
1041 rrd.rra_def[i].pdp_cnt);
1042 }
1043 break;
1044 case CF_MAXIMUM:
1045 cum_val =
1046 IFDNAN(rrd.cdp_prep[iii].
1047 scratch[CDP_val].u_val, -DINF);
1048 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1049 #ifdef DEBUG
1050 if (isnan
1051 (rrd.cdp_prep[iii].scratch[CDP_val].
1052 u_val) && isnan(pdp_temp[ii])) {
1053 fprintf(stderr,
1054 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1055 i, ii);
1056 exit(-1);
1057 }
1058 #endif
1059 if (cur_val > cum_val)
1060 rrd.cdp_prep[iii].
1061 scratch[CDP_primary_val].u_val =
1062 cur_val;
1063 else
1064 rrd.cdp_prep[iii].
1065 scratch[CDP_primary_val].u_val =
1066 cum_val;
1067 /* initialize carry over value */
1068 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1069 pdp_temp[ii];
1070 break;
1071 case CF_MINIMUM:
1072 cum_val =
1073 IFDNAN(rrd.cdp_prep[iii].
1074 scratch[CDP_val].u_val, DINF);
1075 cur_val = IFDNAN(pdp_temp[ii], DINF);
1076 #ifdef DEBUG
1077 if (isnan
1078 (rrd.cdp_prep[iii].scratch[CDP_val].
1079 u_val) && isnan(pdp_temp[ii])) {
1080 fprintf(stderr,
1081 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1082 i, ii);
1083 exit(-1);
1084 }
1085 #endif
1086 if (cur_val < cum_val)
1087 rrd.cdp_prep[iii].
1088 scratch[CDP_primary_val].u_val =
1089 cur_val;
1090 else
1091 rrd.cdp_prep[iii].
1092 scratch[CDP_primary_val].u_val =
1093 cum_val;
1094 /* initialize carry over value */
1095 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1096 pdp_temp[ii];
1097 break;
1098 case CF_LAST:
1099 default:
1100 rrd.cdp_prep[iii].
1101 scratch[CDP_primary_val].u_val =
1102 pdp_temp[ii];
1103 /* initialize carry over value */
1104 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1105 pdp_temp[ii];
1106 break;
1107 }
1108 } /* endif meets xff value requirement for a valid value */
1109 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1110 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1111 if (isnan(pdp_temp[ii]))
1112 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1113 u_cnt =
1114 (elapsed_pdp_st -
1115 start_pdp_offset) %
1116 rrd.rra_def[i].pdp_cnt;
1117 else
1118 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1119 u_cnt = 0;
1120 } else { /* rra_step_cnt[i] == 0 */
1122 #ifdef DEBUG
1123 if (isnan
1124 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1125 fprintf(stderr,
1126 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1127 i, ii);
1128 } else {
1129 fprintf(stderr,
1130 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1131 i, ii,
1132 rrd.cdp_prep[iii].scratch[CDP_val].
1133 u_val);
1134 }
1135 #endif
1136 if (isnan(pdp_temp[ii])) {
1137 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1138 u_cnt += elapsed_pdp_st;
1139 } else
1140 if (isnan
1141 (rrd.cdp_prep[iii].scratch[CDP_val].
1142 u_val)) {
1143 if (current_cf == CF_AVERAGE) {
1144 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1145 pdp_temp[ii] * elapsed_pdp_st;
1146 } else {
1147 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1148 pdp_temp[ii];
1149 }
1150 #ifdef DEBUG
1151 fprintf(stderr,
1152 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1153 i, ii,
1154 rrd.cdp_prep[iii].scratch[CDP_val].
1155 u_val);
1156 #endif
1157 } else {
1158 switch (current_cf) {
1159 case CF_AVERAGE:
1160 rrd.cdp_prep[iii].scratch[CDP_val].
1161 u_val +=
1162 pdp_temp[ii] * elapsed_pdp_st;
1163 break;
1164 case CF_MINIMUM:
1165 if (pdp_temp[ii] <
1166 rrd.cdp_prep[iii].scratch[CDP_val].
1167 u_val)
1168 rrd.cdp_prep[iii].scratch[CDP_val].
1169 u_val = pdp_temp[ii];
1170 break;
1171 case CF_MAXIMUM:
1172 if (pdp_temp[ii] >
1173 rrd.cdp_prep[iii].scratch[CDP_val].
1174 u_val)
1175 rrd.cdp_prep[iii].scratch[CDP_val].
1176 u_val = pdp_temp[ii];
1177 break;
1178 case CF_LAST:
1179 default:
1180 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1181 pdp_temp[ii];
1182 break;
1183 }
1184 }
1185 }
1186 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1187 if (elapsed_pdp_st > 2) {
1188 switch (current_cf) {
1189 case CF_AVERAGE:
1190 default:
1191 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1192 u_val = pdp_temp[ii];
1193 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1194 u_val = pdp_temp[ii];
1195 break;
1196 case CF_SEASONAL:
1197 case CF_DEVSEASONAL:
1198 /* need to update cached seasonal values, so they are consistent
1199 * with the bulk update */
1200 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1201 * CDP_last_deviation are the same. */
1202 rrd.cdp_prep[iii].
1203 scratch[CDP_hw_last_seasonal].u_val =
1204 last_seasonal_coef[ii];
1205 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1206 u_val = seasonal_coef[ii];
1207 break;
1208 case CF_HWPREDICT:
1209 case CF_MHWPREDICT:
1210 /* need to update the null_count and last_null_count.
1211 * even do this for non-DNAN pdp_temp because the
1212 * algorithm is not learning from batch updates. */
1213 rrd.cdp_prep[iii].scratch[CDP_null_count].
1214 u_cnt += elapsed_pdp_st;
1215 rrd.cdp_prep[iii].
1216 scratch[CDP_last_null_count].u_cnt +=
1217 elapsed_pdp_st - 1;
1218 /* fall through */
1219 case CF_DEVPREDICT:
1220 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1221 u_val = DNAN;
1222 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1223 u_val = DNAN;
1224 break;
1225 case CF_FAILURES:
1226 /* do not count missed bulk values as failures */
1227 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1228 u_val = 0;
1229 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1230 u_val = 0;
1231 /* need to reset violations buffer.
1232 * could do this more carefully, but for now, just
1233 * assume a bulk update wipes away all violations. */
1234 erase_violations(&rrd, iii, i);
1235 break;
1236 }
1237 }
1238 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1240 if (rrd_test_error())
1241 break;
1243 } /* endif data sources loop */
1244 } /* end RRA Loop */
1246 /* this loop is only entered if elapsed_pdp_st < 3 */
1247 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1248 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1249 for (i = 0, rra_start = rra_begin;
1250 i < rrd.stat_head->rra_cnt;
1251 rra_start +=
1252 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1253 sizeof(rrd_value_t), i++) {
1254 if (rrd.rra_def[i].pdp_cnt > 1)
1255 continue;
1257 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1258 if (current_cf == CF_SEASONAL
1259 || current_cf == CF_DEVSEASONAL) {
1260 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1261 elapsed_pdp_st + (scratch_idx ==
1262 CDP_primary_val ? 1
1263 : 2),
1264 &seasonal_coef);
1265 rra_current = rrd_tell(rrd_file);
1266 }
1267 if (rrd_test_error())
1268 break;
1269 /* loop over data soures within each RRA */
1270 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1271 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1272 i * (rrd.stat_head->ds_cnt) + ii,
1273 i, ii, scratch_idx, seasonal_coef);
1274 }
1275 } /* end RRA Loop */
1276 if (rrd_test_error())
1277 break;
1278 } /* end elapsed_pdp_st loop */
1280 if (rrd_test_error())
1281 break;
1283 /* Ready to write to disk */
1284 /* Move sequentially through the file, writing one RRA at a time.
1285 * Note this architecture divorces the computation of CDP with
1286 * flushing updated RRA entries to disk. */
1287 for (i = 0, rra_start = rra_begin;
1288 i < rrd.stat_head->rra_cnt;
1289 rra_start +=
1290 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1291 sizeof(rrd_value_t), i++) {
1292 /* is th5Aere anything to write for this RRA? If not, continue. */
1293 if (rra_step_cnt[i] == 0)
1294 continue;
1296 /* write the first row */
1297 #ifdef DEBUG
1298 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1299 #endif
1300 rrd.rra_ptr[i].cur_row++;
1301 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1302 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1303 /* positition on the first row */
1304 rra_pos_tmp = rra_start +
1305 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1306 sizeof(rrd_value_t);
1307 if (rra_pos_tmp != rra_current) {
1308 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1309 rrd_set_error("seek error in rrd");
1310 break;
1311 }
1312 rra_current = rra_pos_tmp;
1313 }
1314 #ifdef DEBUG
1315 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1316 #endif
1317 scratch_idx = CDP_primary_val;
1318 if (pcdp_summary != NULL) {
1319 rra_time = (current_time - current_time
1320 % (rrd.rra_def[i].pdp_cnt *
1321 rrd.stat_head->pdp_step))
1322 -
1323 ((rra_step_cnt[i] -
1324 1) * rrd.rra_def[i].pdp_cnt *
1325 rrd.stat_head->pdp_step);
1326 }
1327 pcdp_summary =
1328 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1329 scratch_idx, pcdp_summary, &rra_time);
1330 if (rrd_test_error())
1331 break;
1333 /* write other rows of the bulk update, if any */
1334 scratch_idx = CDP_secondary_val;
1335 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1336 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1337 #ifdef DEBUG
1338 fprintf(stderr,
1339 "Wraparound for RRA %s, %lu updates left\n",
1340 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1341 #endif
1342 /* wrap */
1343 rrd.rra_ptr[i].cur_row = 0;
1344 /* seek back to beginning of current rra */
1345 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1346 rrd_set_error("seek error in rrd");
1347 break;
1348 }
1349 #ifdef DEBUG
1350 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1351 rrd_file->pos);
1352 #endif
1353 rra_current = rra_start;
1354 }
1355 if (pcdp_summary != NULL) {
1356 rra_time = (current_time - current_time
1357 % (rrd.rra_def[i].pdp_cnt *
1358 rrd.stat_head->pdp_step))
1359 -
1360 ((rra_step_cnt[i] -
1361 2) * rrd.rra_def[i].pdp_cnt *
1362 rrd.stat_head->pdp_step);
1363 }
1364 pcdp_summary =
1365 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1366 scratch_idx, pcdp_summary, &rra_time);
1367 }
1369 if (rrd_test_error())
1370 break;
1371 } /* RRA LOOP */
1373 /* break out of the argument parsing loop if error_string is set */
1374 if (rrd_test_error()) {
1375 free(step_start);
1376 break;
1377 }
1379 } /* endif a pdp_st has occurred */
1380 rrd.live_head->last_up = current_time;
1381 rrd.live_head->last_up_usec = current_time_usec;
1382 free(step_start);
1383 } /* function argument loop */
1385 if (seasonal_coef != NULL)
1386 free(seasonal_coef);
1387 if (last_seasonal_coef != NULL)
1388 free(last_seasonal_coef);
1389 if (rra_step_cnt != NULL)
1390 free(rra_step_cnt);
1391 rpnstack_free(&rpnstack);
1393 #if 0
1394 //rrd_flush(rrd_file); //XXX: really needed?
1395 #endif
1396 /* if we got here and if there is an error and if the file has not been
1397 * written to, then close things up and return. */
1398 if (rrd_test_error()) {
1399 goto err_free_pdp_new;
1400 }
1402 /* aargh ... that was tough ... so many loops ... anyway, its done.
1403 * we just need to write back the live header portion now*/
1405 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1406 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1407 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1408 SEEK_SET) != 0) {
1409 rrd_set_error("seek rrd for live header writeback");
1410 goto err_free_pdp_new;
1411 }
1412 /* for mmap, we did already write to the underlying mapping, so we do
1413 not need to write again. */
1414 #ifndef HAVE_MMAP
1415 if (version >= 3) {
1416 if (rrd_write(rrd_file, rrd.live_head,
1417 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1418 rrd_set_error("rrd_write live_head to rrd");
1419 goto err_free_pdp_new;
1420 }
1421 } else {
1422 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1423 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1424 rrd_set_error("rrd_write live_head to rrd");
1425 goto err_free_pdp_new;
1426 }
1427 }
1430 if (rrd_write(rrd_file, rrd.pdp_prep,
1431 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1432 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1433 rrd_set_error("rrd_write pdp_prep to rrd");
1434 goto err_free_pdp_new;
1435 }
1437 if (rrd_write(rrd_file, rrd.cdp_prep,
1438 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1439 rrd.stat_head->ds_cnt)
1440 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1441 rrd.stat_head->ds_cnt)) {
1443 rrd_set_error("rrd_write cdp_prep to rrd");
1444 goto err_free_pdp_new;
1445 }
1447 if (rrd_write(rrd_file, rrd.rra_ptr,
1448 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1449 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1450 rrd_set_error("rrd_write rra_ptr to rrd");
1451 goto err_free_pdp_new;
1452 }
1453 #endif
1455 /* rrd_flush(rrd_file); */
1457 /* calling the smoothing code here guarantees at most
1458 * one smoothing operation per rrd_update call. Unfortunately,
1459 * it is possible with bulk updates, or a long-delayed update
1460 * for smoothing to occur off-schedule. This really isn't
1461 * critical except during the burning cycles. */
1462 if (schedule_smooth) {
1464 rra_start = rra_begin;
1465 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1466 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1467 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1468 #ifdef DEBUG
1469 fprintf(stderr, "Running smoother for rra %ld\n", i);
1470 #endif
1471 apply_smoother(&rrd, i, rra_start, rrd_file);
1472 if (rrd_test_error())
1473 break;
1474 }
1475 rra_start += rrd.rra_def[i].row_cnt
1476 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1477 }
1478 }
1480 /* rrd_dontneed(rrd_file,&rrd); */
1481 rrd_free(&rrd);
1482 rrd_close(rrd_file);
1484 free(pdp_new);
1485 free(tmpl_idx);
1486 free(pdp_temp);
1487 free(updvals);
1488 return (0);
1490 err_free_pdp_new:
1491 free(pdp_new);
1492 err_free_tmpl_idx:
1493 free(tmpl_idx);
1494 err_free_pdp_temp:
1495 free(pdp_temp);
1496 err_free_updvals:
1497 free(updvals);
1498 err_close:
1499 rrd_close(rrd_file);
1500 err_free:
1501 rrd_free(&rrd);
1502 err_out:
1503 return (-1);
1504 }
1506 /*
1507 * get exclusive lock to whole file.
1508 * lock gets removed when we close the file
1509 *
1510 * returns 0 on success
1511 */
1512 int LockRRD(
1513 int in_file)
1514 {
1515 int rcstat;
1517 {
1518 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1519 struct _stat st;
1521 if (_fstat(in_file, &st) == 0) {
1522 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1523 } else {
1524 rcstat = -1;
1525 }
1526 #else
1527 struct flock lock;
1529 lock.l_type = F_WRLCK; /* exclusive write lock */
1530 lock.l_len = 0; /* whole file */
1531 lock.l_start = 0; /* start of file */
1532 lock.l_whence = SEEK_SET; /* end of file */
1534 rcstat = fcntl(in_file, F_SETLK, &lock);
1535 #endif
1536 }
1538 return (rcstat);
1539 }