1 /*****************************************************************************
2 * RRDtool 1.2.23 Copyright by Tobi Oetiker, 1997-2007
3 *****************************************************************************
4 * rrd_update.c RRD Update Function
5 *****************************************************************************
6 * $Id$
7 *****************************************************************************/
9 #include "rrd_tool.h"
11 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
12 #include <sys/locking.h>
13 #include <sys/stat.h>
14 #include <io.h>
15 #endif
17 #include <locale.h>
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27 * WIN32 does not have gettimeofday and struct timeval. This is a quick and dirty
28 * replacement.
29 */
30 #include <sys/timeb.h>
32 #ifndef __MINGW32__
33 struct timeval {
34 time_t tv_sec; /* seconds */
35 long tv_usec; /* microseconds */
36 };
37 #endif
39 struct __timezone {
40 int tz_minuteswest; /* minutes W of Greenwich */
41 int tz_dsttime; /* type of dst correction */
42 };
44 static int gettimeofday(
45 struct timeval *t,
46 struct __timezone *tz)
47 {
49 struct _timeb current_time;
51 _ftime(¤t_time);
53 t->tv_sec = current_time.time;
54 t->tv_usec = current_time.millitm * 1000;
56 return 0;
57 }
59 #endif
60 /*
61 * normalize time as returned by gettimeofday. usec part must
62 * be always >= 0
63 */
64 static inline void normalize_time(
65 struct timeval *t)
66 {
67 if (t->tv_usec < 0) {
68 t->tv_sec--;
69 t->tv_usec += 1000000L;
70 }
71 }
73 static inline info_t *write_RRA_row(
74 rrd_file_t *rrd_file,
75 rrd_t *rrd,
76 unsigned long rra_idx,
77 unsigned long *rra_current,
78 unsigned short CDP_scratch_idx,
79 info_t *pcdp_summary,
80 time_t *rra_time)
81 {
82 unsigned long ds_idx, cdp_idx;
83 infoval iv;
85 for (ds_idx = 0; ds_idx < rrd->stat_head->ds_cnt; ds_idx++) {
86 /* compute the cdp index */
87 cdp_idx = rra_idx * (rrd->stat_head->ds_cnt) + ds_idx;
88 #ifdef DEBUG
89 fprintf(stderr, " -- RRA WRITE VALUE %e, at %ld CF:%s\n",
90 rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,
91 rrd_file->pos, rrd->rra_def[rra_idx].cf_nam);
92 #endif
93 if (pcdp_summary != NULL) {
94 iv.u_val = rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
95 /* append info to the return hash */
96 pcdp_summary = info_push(pcdp_summary,
97 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
98 *rra_time,
99 rrd->rra_def[rra_idx].
100 cf_nam,
101 rrd->rra_def[rra_idx].
102 pdp_cnt,
103 rrd->ds_def[ds_idx].
104 ds_nam), RD_I_VAL, iv);
105 }
106 if (rrd_write
107 (rrd_file,
108 &(rrd->cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
109 sizeof(rrd_value_t)) != sizeof(rrd_value_t)) {
110 rrd_set_error("writing rrd: %s", rrd_strerror(errno));
111 return 0;
112 }
113 *rra_current += sizeof(rrd_value_t);
114 }
115 return (pcdp_summary);
116 }
118 int rrd_update_r(
119 const char *filename,
120 const char *tmplt,
121 int argc,
122 const char **argv);
123 int _rrd_update(
124 const char *filename,
125 const char *tmplt,
126 int argc,
127 const char **argv,
128 info_t *);
130 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
133 info_t *rrd_update_v(
134 int argc,
135 char **argv)
136 {
137 char *tmplt = NULL;
138 info_t *result = NULL;
139 infoval rc;
140 struct option long_options[] = {
141 {"template", required_argument, 0, 't'},
142 {0, 0, 0, 0}
143 };
145 rc.u_int = -1;
146 optind = 0;
147 opterr = 0; /* initialize getopt */
149 while (1) {
150 int option_index = 0;
151 int opt;
153 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
155 if (opt == EOF)
156 break;
158 switch (opt) {
159 case 't':
160 tmplt = optarg;
161 break;
163 case '?':
164 rrd_set_error("unknown option '%s'", argv[optind - 1]);
165 goto end_tag;
166 }
167 }
169 /* need at least 2 arguments: filename, data. */
170 if (argc - optind < 2) {
171 rrd_set_error("Not enough arguments");
172 goto end_tag;
173 }
174 rc.u_int = 0;
175 result = info_push(NULL, sprintf_alloc("return_value"), RD_I_INT, rc);
176 rc.u_int = _rrd_update(argv[optind], tmplt,
177 argc - optind - 1,
178 (const char **) (argv + optind + 1), result);
179 result->value.u_int = rc.u_int;
180 end_tag:
181 return result;
182 }
184 int rrd_update(
185 int argc,
186 char **argv)
187 {
188 struct option long_options[] = {
189 {"template", required_argument, 0, 't'},
190 {0, 0, 0, 0}
191 };
192 int option_index = 0;
193 int opt;
194 char *tmplt = NULL;
195 int rc;
197 optind = 0;
198 opterr = 0; /* initialize getopt */
200 while (1) {
201 opt = getopt_long(argc, argv, "t:", long_options, &option_index);
203 if (opt == EOF)
204 break;
206 switch (opt) {
207 case 't':
208 tmplt = strdup(optarg);
209 break;
211 case '?':
212 rrd_set_error("unknown option '%s'", argv[optind - 1]);
213 return (-1);
214 }
215 }
217 /* need at least 2 arguments: filename, data. */
218 if (argc - optind < 2) {
219 rrd_set_error("Not enough arguments");
221 return -1;
222 }
224 rc = rrd_update_r(argv[optind], tmplt,
225 argc - optind - 1, (const char **) (argv + optind + 1));
226 free(tmplt);
227 return rc;
228 }
230 int rrd_update_r(
231 const char *filename,
232 const char *tmplt,
233 int argc,
234 const char **argv)
235 {
236 return _rrd_update(filename, tmplt, argc, argv, NULL);
237 }
239 int _rrd_update(
240 const char *filename,
241 const char *tmplt,
242 int argc,
243 const char **argv,
244 info_t *pcdp_summary)
245 {
247 int arg_i = 2;
248 short j;
249 unsigned long i, ii, iii = 1;
251 unsigned long rra_begin; /* byte pointer to the rra
252 * area in the rrd file. this
253 * pointer never changes value */
254 unsigned long rra_start; /* byte pointer to the rra
255 * area in the rrd file. this
256 * pointer changes as each rrd is
257 * processed. */
258 unsigned long rra_current; /* byte pointer to the current write
259 * spot in the rrd file. */
260 unsigned long rra_pos_tmp; /* temporary byte pointer. */
261 double interval, pre_int, post_int; /* interval between this and
262 * the last run */
263 unsigned long proc_pdp_st; /* which pdp_st was the last
264 * to be processed */
265 unsigned long occu_pdp_st; /* when was the pdp_st
266 * before the last update
267 * time */
268 unsigned long proc_pdp_age; /* how old was the data in
269 * the pdp prep area when it
270 * was last updated */
271 unsigned long occu_pdp_age; /* how long ago was the last
272 * pdp_step time */
273 rrd_value_t *pdp_new; /* prepare the incoming data
274 * to be added the the
275 * existing entry */
276 rrd_value_t *pdp_temp; /* prepare the pdp values
277 * to be added the the
278 * cdp values */
280 long *tmpl_idx; /* index representing the settings
281 transported by the tmplt index */
282 unsigned long tmpl_cnt = 2; /* time and data */
284 rrd_t rrd;
285 time_t current_time = 0;
286 time_t rra_time = 0; /* time of update for a RRA */
287 unsigned long current_time_usec = 0; /* microseconds part of current time */
288 struct timeval tmp_time; /* used for time conversion */
290 char **updvals;
291 int schedule_smooth = 0;
292 rrd_value_t *seasonal_coef = NULL, *last_seasonal_coef = NULL;
294 /* a vector of future Holt-Winters seasonal coefs */
295 unsigned long elapsed_pdp_st;
297 /* number of elapsed PDP steps since last update */
298 unsigned long *rra_step_cnt = NULL;
300 /* number of rows to be updated in an RRA for a data
301 * value. */
302 unsigned long start_pdp_offset;
304 /* number of PDP steps since the last update that
305 * are assigned to the first CDP to be generated
306 * since the last update. */
307 unsigned short scratch_idx;
309 /* index into the CDP scratch array */
310 enum cf_en current_cf;
312 /* numeric id of the current consolidation function */
313 rpnstack_t rpnstack; /* used for COMPUTE DS */
314 int version; /* rrd version */
315 char *endptr; /* used in the conversion */
316 rrd_file_t *rrd_file;
318 rpnstack_init(&rpnstack);
320 /* need at least 1 arguments: data. */
321 if (argc < 1) {
322 rrd_set_error("Not enough arguments");
323 goto err_out;
324 }
326 rrd_file = rrd_open(filename, &rrd, RRD_READWRITE);
327 if (rrd_file == NULL) {
328 goto err_free;
329 }
330 /* We are now at the beginning of the rra's */
331 rra_current = rra_start = rra_begin = rrd_file->header_len;
333 /* initialize time */
334 version = atoi(rrd.stat_head->version);
335 gettimeofday(&tmp_time, 0);
336 normalize_time(&tmp_time);
337 current_time = tmp_time.tv_sec;
338 if (version >= 3) {
339 current_time_usec = tmp_time.tv_usec;
340 } else {
341 current_time_usec = 0;
342 }
344 /* get exclusive lock to whole file.
345 * lock gets removed when we close the file.
346 */
347 if (LockRRD(rrd_file->fd) != 0) {
348 rrd_set_error("could not lock RRD");
349 goto err_close;
350 }
352 if ((updvals =
353 malloc(sizeof(char *) * (rrd.stat_head->ds_cnt + 1))) == NULL) {
354 rrd_set_error("allocating updvals pointer array");
355 goto err_close;
356 }
358 if ((pdp_temp = malloc(sizeof(rrd_value_t)
359 * rrd.stat_head->ds_cnt)) == NULL) {
360 rrd_set_error("allocating pdp_temp ...");
361 goto err_free_updvals;
362 }
364 if ((tmpl_idx = malloc(sizeof(unsigned long)
365 * (rrd.stat_head->ds_cnt + 1))) == NULL) {
366 rrd_set_error("allocating tmpl_idx ...");
367 goto err_free_pdp_temp;
368 }
369 /* initialize tmplt redirector */
370 /* default config example (assume DS 1 is a CDEF DS)
371 tmpl_idx[0] -> 0; (time)
372 tmpl_idx[1] -> 1; (DS 0)
373 tmpl_idx[2] -> 3; (DS 2)
374 tmpl_idx[3] -> 4; (DS 3) */
375 tmpl_idx[0] = 0; /* time */
376 for (i = 1, ii = 1; i <= rrd.stat_head->ds_cnt; i++) {
377 if (dst_conv(rrd.ds_def[i - 1].dst) != DST_CDEF)
378 tmpl_idx[ii++] = i;
379 }
380 tmpl_cnt = ii;
382 if (tmplt) {
383 /* we should work on a writeable copy here */
384 char *dsname;
385 unsigned int tmpl_len;
386 char *tmplt_copy = strdup(tmplt);
388 dsname = tmplt_copy;
389 tmpl_cnt = 1; /* the first entry is the time */
390 tmpl_len = strlen(tmplt_copy);
391 for (i = 0; i <= tmpl_len; i++) {
392 if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
393 tmplt_copy[i] = '\0';
394 if (tmpl_cnt > rrd.stat_head->ds_cnt) {
395 rrd_set_error
396 ("tmplt contains more DS definitions than RRD");
397 goto err_free_tmpl_idx;
398 }
399 if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd, dsname)) == -1) {
400 rrd_set_error("unknown DS name '%s'", dsname);
401 goto err_free_tmpl_idx;
402 } else {
403 /* the first element is always the time */
404 tmpl_idx[tmpl_cnt - 1]++;
405 /* go to the next entry on the tmplt_copy */
406 dsname = &tmplt_copy[i + 1];
407 /* fix the damage we did before */
408 if (i < tmpl_len) {
409 tmplt_copy[i] = ':';
410 }
412 }
413 }
414 }
415 free(tmplt_copy);
416 }
417 if ((pdp_new = malloc(sizeof(rrd_value_t)
418 * rrd.stat_head->ds_cnt)) == NULL) {
419 rrd_set_error("allocating pdp_new ...");
420 goto err_free_tmpl_idx;
421 }
422 /* loop through the arguments. */
423 for (arg_i = 0; arg_i < argc; arg_i++) {
424 char *stepper = strdup(argv[arg_i]);
425 char *step_start = stepper;
426 char *p;
427 char *parsetime_error = NULL;
428 enum { atstyle, normal } timesyntax;
429 struct rrd_time_value ds_tv;
431 if (stepper == NULL) {
432 rrd_set_error("failed duplication argv entry");
433 free(step_start);
434 goto err_free_pdp_new;
435 }
436 /* initialize all ds input to unknown except the first one
437 which has always got to be set */
438 for (ii = 1; ii <= rrd.stat_head->ds_cnt; ii++)
439 updvals[ii] = "U";
440 updvals[0] = stepper;
441 /* separate all ds elements; first must be examined separately
442 due to alternate time syntax */
443 if ((p = strchr(stepper, '@')) != NULL) {
444 timesyntax = atstyle;
445 *p = '\0';
446 stepper = p + 1;
447 } else if ((p = strchr(stepper, ':')) != NULL) {
448 timesyntax = normal;
449 *p = '\0';
450 stepper = p + 1;
451 } else {
452 rrd_set_error
453 ("expected timestamp not found in data source from %s",
454 argv[arg_i]);
455 free(step_start);
456 break;
457 }
458 ii = 1;
459 updvals[tmpl_idx[ii]] = stepper;
460 while (*stepper) {
461 if (*stepper == ':') {
462 *stepper = '\0';
463 ii++;
464 if (ii < tmpl_cnt) {
465 updvals[tmpl_idx[ii]] = stepper + 1;
466 }
467 }
468 stepper++;
469 }
471 if (ii != tmpl_cnt - 1) {
472 rrd_set_error
473 ("expected %lu data source readings (got %lu) from %s",
474 tmpl_cnt - 1, ii, argv[arg_i]);
475 free(step_start);
476 break;
477 }
479 /* get the time from the reading ... handle N */
480 if (timesyntax == atstyle) {
481 if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
482 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error);
483 free(step_start);
484 break;
485 }
486 if (ds_tv.type == RELATIVE_TO_END_TIME ||
487 ds_tv.type == RELATIVE_TO_START_TIME) {
488 rrd_set_error("specifying time relative to the 'start' "
489 "or 'end' makes no sense here: %s", updvals[0]);
490 free(step_start);
491 break;
492 }
494 current_time = mktime(&ds_tv.tm) + ds_tv.offset;
496 current_time_usec = 0; /* FIXME: how to handle usecs here ? */
498 } else if (strcmp(updvals[0], "N") == 0) {
499 gettimeofday(&tmp_time, 0);
500 normalize_time(&tmp_time);
501 current_time = tmp_time.tv_sec;
502 current_time_usec = tmp_time.tv_usec;
503 } else {
504 double tmp;
505 char *old_locale;
506 old_locale = setlocale(LC_NUMERIC,"C");
507 tmp = strtod(updvals[0], 0);
508 setlocale(LC_NUMERIC,old_locale);
509 current_time = floor(tmp);
510 current_time_usec =
511 (long) ((tmp - (double) current_time) * 1000000.0);
512 }
513 /* dont do any correction for old version RRDs */
514 if (version < 3)
515 current_time_usec = 0;
517 if (current_time < rrd.live_head->last_up ||
518 (current_time == rrd.live_head->last_up &&
519 (long) current_time_usec <=
520 (long) rrd.live_head->last_up_usec)) {
521 rrd_set_error("illegal attempt to update using time %ld when "
522 "last update time is %ld (minimum one second step)",
523 current_time, rrd.live_head->last_up);
524 free(step_start);
525 break;
526 }
528 /* seek to the beginning of the rra's */
529 if (rra_current != rra_begin) {
530 #ifndef HAVE_MMAP
531 if (rrd_seek(rrd_file, rra_begin, SEEK_SET) != 0) {
532 rrd_set_error("seek error in rrd");
533 free(step_start);
534 break;
535 }
536 #endif
537 rra_current = rra_begin;
538 }
539 rra_start = rra_begin;
541 /* when was the current pdp started */
542 proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
543 proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
545 /* when did the last pdp_st occur */
546 occu_pdp_age = current_time % rrd.stat_head->pdp_step;
547 occu_pdp_st = current_time - occu_pdp_age;
549 /* interval = current_time - rrd.live_head->last_up; */
550 interval = (double) (current_time - rrd.live_head->last_up)
551 + (double) ((long) current_time_usec -
552 (long) rrd.live_head->last_up_usec) / 1000000.0;
554 if (occu_pdp_st > proc_pdp_st) {
555 /* OK we passed the pdp_st moment */
556 pre_int = (long) occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
557 * occurred before the latest
558 * pdp_st moment*/
559 pre_int -= ((double) rrd.live_head->last_up_usec) / 1000000.0; /* adjust usecs */
560 post_int = occu_pdp_age; /* how much after it */
561 post_int += ((double) current_time_usec) / 1000000.0; /* adjust usecs */
562 } else {
563 pre_int = interval;
564 post_int = 0;
565 }
567 #ifdef DEBUG
568 printf("proc_pdp_age %lu\t"
569 "proc_pdp_st %lu\t"
570 "occu_pfp_age %lu\t"
571 "occu_pdp_st %lu\t"
572 "int %lf\t"
573 "pre_int %lf\t"
574 "post_int %lf\n", proc_pdp_age, proc_pdp_st,
575 occu_pdp_age, occu_pdp_st, interval, pre_int, post_int);
576 #endif
578 /* process the data sources and update the pdp_prep
579 * area accordingly */
580 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
581 enum dst_en dst_idx;
583 dst_idx = dst_conv(rrd.ds_def[i].dst);
585 /* make sure we do not build diffs with old last_ds values */
586 if (rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
587 strncpy(rrd.pdp_prep[i].last_ds, "U", LAST_DS_LEN - 1);
588 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
589 }
591 /* NOTE: DST_CDEF should never enter this if block, because
592 * updvals[i+1][0] is initialized to 'U'; unless the caller
593 * accidently specified a value for the DST_CDEF. To handle
594 * this case, an extra check is required. */
596 if ((updvals[i + 1][0] != 'U') &&
597 (dst_idx != DST_CDEF) &&
598 rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
599 double rate = DNAN;
600 char *old_locale;
602 /* the data source type defines how to process the data */
603 /* pdp_new contains rate * time ... eg the bytes
604 * transferred during the interval. Doing it this way saves
605 * a lot of math operations */
606 switch (dst_idx) {
607 case DST_COUNTER:
608 case DST_DERIVE:
609 if (rrd.pdp_prep[i].last_ds[0] != 'U') {
610 for (ii = 0; updvals[i + 1][ii] != '\0'; ii++) {
611 if ((updvals[i + 1][ii] < '0'
612 || updvals[i + 1][ii] > '9') && (ii != 0
613 && updvals[i
614 +
615 1]
616 [ii] !=
617 '-')) {
618 rrd_set_error("not a simple integer: '%s'",
619 updvals[i + 1]);
620 break;
621 }
622 }
623 if (rrd_test_error()) {
624 break;
625 }
626 pdp_new[i] =
627 rrd_diff(updvals[i + 1], rrd.pdp_prep[i].last_ds);
628 if (dst_idx == DST_COUNTER) {
629 /* simple overflow catcher suggested by Andres Kroonmaa */
630 /* this will fail terribly for non 32 or 64 bit counters ... */
631 /* are there any others in SNMP land ? */
632 if (pdp_new[i] < (double) 0.0)
633 pdp_new[i] += (double) 4294967296.0; /* 2^32 */
634 if (pdp_new[i] < (double) 0.0)
635 pdp_new[i] += (double) 18446744069414584320.0;
636 /* 2^64-2^32 */ ;
637 }
638 rate = pdp_new[i] / interval;
639 } else {
640 pdp_new[i] = DNAN;
641 }
642 break;
643 case DST_ABSOLUTE:
644 old_locale = setlocale(LC_NUMERIC,"C");
645 errno = 0;
646 pdp_new[i] = strtod(updvals[i + 1], &endptr);
647 setlocale(LC_NUMERIC,old_locale);
648 if (errno > 0) {
649 rrd_set_error("converting '%s' to float: %s",
650 updvals[i + 1], rrd_strerror(errno));
651 break;
652 };
653 if (endptr[0] != '\0') {
654 rrd_set_error
655 ("conversion of '%s' to float not complete: tail '%s'",
656 updvals[i + 1], endptr);
657 break;
658 }
659 rate = pdp_new[i] / interval;
660 break;
661 case DST_GAUGE:
662 errno = 0;
663 old_locale = setlocale(LC_NUMERIC,"C");
664 pdp_new[i] = strtod(updvals[i + 1], &endptr) * interval;
665 setlocale(LC_NUMERIC,old_locale);
666 if (errno > 0) {
667 rrd_set_error("converting '%s' to float: %s",
668 updvals[i + 1], rrd_strerror(errno));
669 break;
670 };
671 if (endptr[0] != '\0') {
672 rrd_set_error
673 ("conversion of '%s' to float not complete: tail '%s'",
674 updvals[i + 1], endptr);
675 break;
676 }
677 rate = pdp_new[i] / interval;
678 break;
679 default:
680 rrd_set_error("rrd contains unknown DS type : '%s'",
681 rrd.ds_def[i].dst);
682 break;
683 }
684 /* break out of this for loop if the error string is set */
685 if (rrd_test_error()) {
686 break;
687 }
688 /* make sure pdp_temp is neither too large or too small
689 * if any of these occur it becomes unknown ...
690 * sorry folks ... */
691 if (!isnan(rate) &&
692 ((!isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
693 rate > rrd.ds_def[i].par[DS_max_val].u_val) ||
694 (!isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
695 rate < rrd.ds_def[i].par[DS_min_val].u_val))) {
696 pdp_new[i] = DNAN;
697 }
698 } else {
699 /* no news is news all the same */
700 pdp_new[i] = DNAN;
701 }
704 /* make a copy of the command line argument for the next run */
705 #ifdef DEBUG
706 fprintf(stderr,
707 "prep ds[%lu]\t"
708 "last_arg '%s'\t"
709 "this_arg '%s'\t"
710 "pdp_new %10.2f\n",
711 i, rrd.pdp_prep[i].last_ds, updvals[i + 1], pdp_new[i]);
712 #endif
713 strncpy(rrd.pdp_prep[i].last_ds, updvals[i + 1], LAST_DS_LEN - 1);
714 rrd.pdp_prep[i].last_ds[LAST_DS_LEN - 1] = '\0';
715 }
716 /* break out of the argument parsing loop if the error_string is set */
717 if (rrd_test_error()) {
718 free(step_start);
719 break;
720 }
721 /* has a pdp_st moment occurred since the last run ? */
723 if (proc_pdp_st == occu_pdp_st) {
724 /* no we have not passed a pdp_st moment. therefore update is simple */
726 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
727 if (isnan(pdp_new[i])) {
728 /* this is not realy accurate if we use subsecond data arival time
729 should have thought of it when going subsecond resolution ...
730 sorry next format change we will have it! */
731 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt +=
732 floor(interval);
733 } else {
734 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
735 rrd.pdp_prep[i].scratch[PDP_val].u_val = pdp_new[i];
736 } else {
737 rrd.pdp_prep[i].scratch[PDP_val].u_val += pdp_new[i];
738 }
739 }
740 #ifdef DEBUG
741 fprintf(stderr,
742 "NO PDP ds[%lu]\t"
743 "value %10.2f\t"
744 "unkn_sec %5lu\n",
745 i,
746 rrd.pdp_prep[i].scratch[PDP_val].u_val,
747 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
748 #endif
749 }
750 } else {
751 /* an pdp_st has occurred. */
753 /* in pdp_prep[].scratch[PDP_val].u_val we have collected
754 rate*seconds which occurred up to the last run.
755 pdp_new[] contains rate*seconds from the latest run.
756 pdp_temp[] will contain the rate for cdp */
758 for (i = 0; i < rrd.stat_head->ds_cnt; i++) {
759 /* update pdp_prep to the current pdp_st. */
760 double pre_unknown = 0.0;
762 if (isnan(pdp_new[i])) {
763 /* a final bit of unkonwn to be added bevore calculation
764 we use a temporary variable for this so that we
765 don't have to turn integer lines before using the value */
766 pre_unknown = pre_int;
767 } else {
768 if (isnan(rrd.pdp_prep[i].scratch[PDP_val].u_val)) {
769 rrd.pdp_prep[i].scratch[PDP_val].u_val =
770 pdp_new[i] / interval * pre_int;
771 } else {
772 rrd.pdp_prep[i].scratch[PDP_val].u_val +=
773 pdp_new[i] / interval * pre_int;
774 }
775 }
778 /* if too much of the pdp_prep is unknown we dump it */
779 if (
780 /* removed because this does not agree with the
781 definition that a heartbeat can be unknown */
782 /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
783 > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
784 /* if the interval is larger thatn mrhb we get NAN */
785 (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
786 (occu_pdp_st - proc_pdp_st <=
787 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
788 pdp_temp[i] = DNAN;
789 } else {
790 pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
791 / ((double) (occu_pdp_st - proc_pdp_st
792 -
793 rrd.pdp_prep[i].
794 scratch[PDP_unkn_sec_cnt].u_cnt)
795 - pre_unknown);
796 }
798 /* process CDEF data sources; remember each CDEF DS can
799 * only reference other DS with a lower index number */
800 if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
801 rpnp_t *rpnp;
803 rpnp =
804 rpn_expand((rpn_cdefds_t *) &
805 (rrd.ds_def[i].par[DS_cdef]));
806 /* substitue data values for OP_VARIABLE nodes */
807 for (ii = 0; rpnp[ii].op != OP_END; ii++) {
808 if (rpnp[ii].op == OP_VARIABLE) {
809 rpnp[ii].op = OP_NUMBER;
810 rpnp[ii].val = pdp_temp[rpnp[ii].ptr];
811 }
812 }
813 /* run the rpn calculator */
814 if (rpn_calc(rpnp, &rpnstack, 0, pdp_temp, i) == -1) {
815 free(rpnp);
816 break; /* exits the data sources pdp_temp loop */
817 }
818 }
820 /* make pdp_prep ready for the next run */
821 if (isnan(pdp_new[i])) {
822 /* this is not realy accurate if we use subsecond data arival time
823 should have thought of it when going subsecond resolution ...
824 sorry next format change we will have it! */
825 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt =
826 floor(post_int);
827 rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
828 } else {
829 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
830 rrd.pdp_prep[i].scratch[PDP_val].u_val =
831 pdp_new[i] / interval * post_int;
832 }
834 #ifdef DEBUG
835 fprintf(stderr,
836 "PDP UPD ds[%lu]\t"
837 "pdp_temp %10.2f\t"
838 "new_prep %10.2f\t"
839 "new_unkn_sec %5lu\n",
840 i, pdp_temp[i],
841 rrd.pdp_prep[i].scratch[PDP_val].u_val,
842 rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
843 #endif
844 }
846 /* if there were errors during the last loop, bail out here */
847 if (rrd_test_error()) {
848 free(step_start);
849 break;
850 }
852 /* compute the number of elapsed pdp_st moments */
853 elapsed_pdp_st =
854 (occu_pdp_st - proc_pdp_st) / rrd.stat_head->pdp_step;
855 #ifdef DEBUG
856 fprintf(stderr, "elapsed PDP steps: %lu\n", elapsed_pdp_st);
857 #endif
858 if (rra_step_cnt == NULL) {
859 rra_step_cnt = (unsigned long *)
860 malloc((rrd.stat_head->rra_cnt) * sizeof(unsigned long));
861 }
863 for (i = 0, rra_start = rra_begin;
864 i < rrd.stat_head->rra_cnt;
865 rra_start +=
866 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
867 sizeof(rrd_value_t), i++) {
868 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
869 start_pdp_offset = rrd.rra_def[i].pdp_cnt -
870 (proc_pdp_st / rrd.stat_head->pdp_step) %
871 rrd.rra_def[i].pdp_cnt;
872 if (start_pdp_offset <= elapsed_pdp_st) {
873 rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
874 rrd.rra_def[i].pdp_cnt + 1;
875 } else {
876 rra_step_cnt[i] = 0;
877 }
879 if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL) {
880 /* If this is a bulk update, we need to skip ahead in
881 the seasonal arrays so that they will be correct for
882 the next observed value;
883 note that for the bulk update itself, no update will
884 occur to DEVSEASONAL or SEASONAL; futhermore, HWPREDICT
885 and DEVPREDICT will be set to DNAN. */
886 if (rra_step_cnt[i] > 2) {
887 /* skip update by resetting rra_step_cnt[i],
888 note that this is not data source specific; this is
889 due to the bulk update, not a DNAN value for the
890 specific data source. */
891 rra_step_cnt[i] = 0;
892 lookup_seasonal(&rrd, i, rra_start, rrd_file,
893 elapsed_pdp_st, &last_seasonal_coef);
894 lookup_seasonal(&rrd, i, rra_start, rrd_file,
895 elapsed_pdp_st + 1, &seasonal_coef);
896 }
898 /* periodically run a smoother for seasonal effects */
899 /* Need to use first cdp parameter buffer to track
900 * burnin (burnin requires a specific smoothing schedule).
901 * The CDP_init_seasonal parameter is really an RRA level,
902 * not a data source within RRA level parameter, but the rra_def
903 * is read only for rrd_update (not flushed to disk). */
904 iii = i * (rrd.stat_head->ds_cnt);
905 if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
906 <= BURNIN_CYCLES) {
907 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
908 > rrd.rra_def[i].row_cnt - 1) {
909 /* mark off one of the burnin cycles */
910 ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].
911 u_cnt);
912 schedule_smooth = 1;
913 }
914 } else {
915 /* someone has no doubt invented a trick to deal with this
916 * wrap around, but at least this code is clear. */
917 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
918 u_cnt > rrd.rra_ptr[i].cur_row) {
919 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
920 * mapping between PDP and CDP */
921 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
922 >=
923 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
924 u_cnt) {
925 #ifdef DEBUG
926 fprintf(stderr,
927 "schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
928 rrd.rra_ptr[i].cur_row,
929 elapsed_pdp_st,
930 rrd.rra_def[i].
931 par[RRA_seasonal_smooth_idx].u_cnt);
932 #endif
933 schedule_smooth = 1;
934 }
935 } else {
936 /* can't rely on negative numbers because we are working with
937 * unsigned values */
938 /* Don't need modulus here. If we've wrapped more than once, only
939 * one smooth is executed at the end. */
940 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >=
941 rrd.rra_def[i].row_cnt
942 && rrd.rra_ptr[i].cur_row + elapsed_pdp_st -
943 rrd.rra_def[i].row_cnt >=
944 rrd.rra_def[i].par[RRA_seasonal_smooth_idx].
945 u_cnt) {
946 #ifdef DEBUG
947 fprintf(stderr,
948 "schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
949 rrd.rra_ptr[i].cur_row,
950 elapsed_pdp_st,
951 rrd.rra_def[i].
952 par[RRA_seasonal_smooth_idx].u_cnt);
953 #endif
954 schedule_smooth = 1;
955 }
956 }
957 }
959 rra_current = rrd_tell(rrd_file);
960 }
961 /* if cf is DEVSEASONAL or SEASONAL */
962 if (rrd_test_error())
963 break;
965 /* update CDP_PREP areas */
966 /* loop over data soures within each RRA */
967 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
969 /* iii indexes the CDP prep area for this data source within the RRA */
970 iii = i * rrd.stat_head->ds_cnt + ii;
972 if (rrd.rra_def[i].pdp_cnt > 1) {
974 if (rra_step_cnt[i] > 0) {
975 /* If we are in this block, as least 1 CDP value will be written to
976 * disk, this is the CDP_primary_val entry. If more than 1 value needs
977 * to be written, then the "fill in" value is the CDP_secondary_val
978 * entry. */
979 if (isnan(pdp_temp[ii])) {
980 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
981 u_cnt += start_pdp_offset;
982 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
983 u_val = DNAN;
984 } else {
985 /* CDP_secondary value is the RRA "fill in" value for intermediary
986 * CDP data entries. No matter the CF, the value is the same because
987 * the average, max, min, and last of a list of identical values is
988 * the same, namely, the value itself. */
989 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
990 u_val = pdp_temp[ii];
991 }
993 if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
994 u_cnt >
995 rrd.rra_def[i].pdp_cnt *
996 rrd.rra_def[i].par[RRA_cdp_xff_val].u_val) {
997 rrd.cdp_prep[iii].scratch[CDP_primary_val].
998 u_val = DNAN;
999 /* initialize carry over */
1000 if (current_cf == CF_AVERAGE) {
1001 if (isnan(pdp_temp[ii])) {
1002 rrd.cdp_prep[iii].scratch[CDP_val].
1003 u_val = DNAN;
1004 } else {
1005 rrd.cdp_prep[iii].scratch[CDP_val].
1006 u_val =
1007 pdp_temp[ii] *
1008 ((elapsed_pdp_st -
1009 start_pdp_offset) %
1010 rrd.rra_def[i].pdp_cnt);
1011 }
1012 } else {
1013 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1014 pdp_temp[ii];
1015 }
1016 } else {
1017 rrd_value_t cum_val, cur_val;
1019 switch (current_cf) {
1020 case CF_AVERAGE:
1021 cum_val =
1022 IFDNAN(rrd.cdp_prep[iii].
1023 scratch[CDP_val].u_val, 0.0);
1024 cur_val = IFDNAN(pdp_temp[ii], 0.0);
1025 rrd.cdp_prep[iii].
1026 scratch[CDP_primary_val].u_val =
1027 (cum_val +
1028 cur_val * start_pdp_offset) /
1029 (rrd.rra_def[i].pdp_cnt -
1030 rrd.cdp_prep[iii].
1031 scratch[CDP_unkn_pdp_cnt].u_cnt);
1032 /* initialize carry over value */
1033 if (isnan(pdp_temp[ii])) {
1034 rrd.cdp_prep[iii].scratch[CDP_val].
1035 u_val = DNAN;
1036 } else {
1037 rrd.cdp_prep[iii].scratch[CDP_val].
1038 u_val =
1039 pdp_temp[ii] *
1040 ((elapsed_pdp_st -
1041 start_pdp_offset) %
1042 rrd.rra_def[i].pdp_cnt);
1043 }
1044 break;
1045 case CF_MAXIMUM:
1046 cum_val =
1047 IFDNAN(rrd.cdp_prep[iii].
1048 scratch[CDP_val].u_val, -DINF);
1049 cur_val = IFDNAN(pdp_temp[ii], -DINF);
1050 #ifdef DEBUG
1051 if (isnan
1052 (rrd.cdp_prep[iii].scratch[CDP_val].
1053 u_val) && isnan(pdp_temp[ii])) {
1054 fprintf(stderr,
1055 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1056 i, ii);
1057 exit(-1);
1058 }
1059 #endif
1060 if (cur_val > cum_val)
1061 rrd.cdp_prep[iii].
1062 scratch[CDP_primary_val].u_val =
1063 cur_val;
1064 else
1065 rrd.cdp_prep[iii].
1066 scratch[CDP_primary_val].u_val =
1067 cum_val;
1068 /* initialize carry over value */
1069 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1070 pdp_temp[ii];
1071 break;
1072 case CF_MINIMUM:
1073 cum_val =
1074 IFDNAN(rrd.cdp_prep[iii].
1075 scratch[CDP_val].u_val, DINF);
1076 cur_val = IFDNAN(pdp_temp[ii], DINF);
1077 #ifdef DEBUG
1078 if (isnan
1079 (rrd.cdp_prep[iii].scratch[CDP_val].
1080 u_val) && isnan(pdp_temp[ii])) {
1081 fprintf(stderr,
1082 "RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1083 i, ii);
1084 exit(-1);
1085 }
1086 #endif
1087 if (cur_val < cum_val)
1088 rrd.cdp_prep[iii].
1089 scratch[CDP_primary_val].u_val =
1090 cur_val;
1091 else
1092 rrd.cdp_prep[iii].
1093 scratch[CDP_primary_val].u_val =
1094 cum_val;
1095 /* initialize carry over value */
1096 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1097 pdp_temp[ii];
1098 break;
1099 case CF_LAST:
1100 default:
1101 rrd.cdp_prep[iii].
1102 scratch[CDP_primary_val].u_val =
1103 pdp_temp[ii];
1104 /* initialize carry over value */
1105 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1106 pdp_temp[ii];
1107 break;
1108 }
1109 } /* endif meets xff value requirement for a valid value */
1110 /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1111 * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1112 if (isnan(pdp_temp[ii]))
1113 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1114 u_cnt =
1115 (elapsed_pdp_st -
1116 start_pdp_offset) %
1117 rrd.rra_def[i].pdp_cnt;
1118 else
1119 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1120 u_cnt = 0;
1121 } else { /* rra_step_cnt[i] == 0 */
1123 #ifdef DEBUG
1124 if (isnan
1125 (rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1126 fprintf(stderr,
1127 "schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1128 i, ii);
1129 } else {
1130 fprintf(stderr,
1131 "schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1132 i, ii,
1133 rrd.cdp_prep[iii].scratch[CDP_val].
1134 u_val);
1135 }
1136 #endif
1137 if (isnan(pdp_temp[ii])) {
1138 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].
1139 u_cnt += elapsed_pdp_st;
1140 } else
1141 if (isnan
1142 (rrd.cdp_prep[iii].scratch[CDP_val].
1143 u_val)) {
1144 if (current_cf == CF_AVERAGE) {
1145 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1146 pdp_temp[ii] * elapsed_pdp_st;
1147 } else {
1148 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1149 pdp_temp[ii];
1150 }
1151 #ifdef DEBUG
1152 fprintf(stderr,
1153 "Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1154 i, ii,
1155 rrd.cdp_prep[iii].scratch[CDP_val].
1156 u_val);
1157 #endif
1158 } else {
1159 switch (current_cf) {
1160 case CF_AVERAGE:
1161 rrd.cdp_prep[iii].scratch[CDP_val].
1162 u_val +=
1163 pdp_temp[ii] * elapsed_pdp_st;
1164 break;
1165 case CF_MINIMUM:
1166 if (pdp_temp[ii] <
1167 rrd.cdp_prep[iii].scratch[CDP_val].
1168 u_val)
1169 rrd.cdp_prep[iii].scratch[CDP_val].
1170 u_val = pdp_temp[ii];
1171 break;
1172 case CF_MAXIMUM:
1173 if (pdp_temp[ii] >
1174 rrd.cdp_prep[iii].scratch[CDP_val].
1175 u_val)
1176 rrd.cdp_prep[iii].scratch[CDP_val].
1177 u_val = pdp_temp[ii];
1178 break;
1179 case CF_LAST:
1180 default:
1181 rrd.cdp_prep[iii].scratch[CDP_val].u_val =
1182 pdp_temp[ii];
1183 break;
1184 }
1185 }
1186 }
1187 } else { /* rrd.rra_def[i].pdp_cnt == 1 */
1188 if (elapsed_pdp_st > 2) {
1189 switch (current_cf) {
1190 case CF_AVERAGE:
1191 default:
1192 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1193 u_val = pdp_temp[ii];
1194 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1195 u_val = pdp_temp[ii];
1196 break;
1197 case CF_SEASONAL:
1198 case CF_DEVSEASONAL:
1199 /* need to update cached seasonal values, so they are consistent
1200 * with the bulk update */
1201 /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1202 * CDP_last_deviation are the same. */
1203 rrd.cdp_prep[iii].
1204 scratch[CDP_hw_last_seasonal].u_val =
1205 last_seasonal_coef[ii];
1206 rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].
1207 u_val = seasonal_coef[ii];
1208 break;
1209 case CF_HWPREDICT:
1210 case CF_MHWPREDICT:
1211 /* need to update the null_count and last_null_count.
1212 * even do this for non-DNAN pdp_temp because the
1213 * algorithm is not learning from batch updates. */
1214 rrd.cdp_prep[iii].scratch[CDP_null_count].
1215 u_cnt += elapsed_pdp_st;
1216 rrd.cdp_prep[iii].
1217 scratch[CDP_last_null_count].u_cnt +=
1218 elapsed_pdp_st - 1;
1219 /* fall through */
1220 case CF_DEVPREDICT:
1221 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1222 u_val = DNAN;
1223 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1224 u_val = DNAN;
1225 break;
1226 case CF_FAILURES:
1227 /* do not count missed bulk values as failures */
1228 rrd.cdp_prep[iii].scratch[CDP_primary_val].
1229 u_val = 0;
1230 rrd.cdp_prep[iii].scratch[CDP_secondary_val].
1231 u_val = 0;
1232 /* need to reset violations buffer.
1233 * could do this more carefully, but for now, just
1234 * assume a bulk update wipes away all violations. */
1235 erase_violations(&rrd, iii, i);
1236 break;
1237 }
1238 }
1239 } /* endif rrd.rra_def[i].pdp_cnt == 1 */
1241 if (rrd_test_error())
1242 break;
1244 } /* endif data sources loop */
1245 } /* end RRA Loop */
1247 /* this loop is only entered if elapsed_pdp_st < 3 */
1248 for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1249 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val) {
1250 for (i = 0, rra_start = rra_begin;
1251 i < rrd.stat_head->rra_cnt;
1252 rra_start +=
1253 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1254 sizeof(rrd_value_t), i++) {
1255 if (rrd.rra_def[i].pdp_cnt > 1)
1256 continue;
1258 current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1259 if (current_cf == CF_SEASONAL
1260 || current_cf == CF_DEVSEASONAL) {
1261 lookup_seasonal(&rrd, i, rra_start, rrd_file,
1262 elapsed_pdp_st + (scratch_idx ==
1263 CDP_primary_val ? 1
1264 : 2),
1265 &seasonal_coef);
1266 rra_current = rrd_tell(rrd_file);
1267 }
1268 if (rrd_test_error())
1269 break;
1270 /* loop over data soures within each RRA */
1271 for (ii = 0; ii < rrd.stat_head->ds_cnt; ii++) {
1272 update_aberrant_CF(&rrd, pdp_temp[ii], current_cf,
1273 i * (rrd.stat_head->ds_cnt) + ii,
1274 i, ii, scratch_idx, seasonal_coef);
1275 }
1276 } /* end RRA Loop */
1277 if (rrd_test_error())
1278 break;
1279 } /* end elapsed_pdp_st loop */
1281 if (rrd_test_error())
1282 break;
1284 /* Ready to write to disk */
1285 /* Move sequentially through the file, writing one RRA at a time.
1286 * Note this architecture divorces the computation of CDP with
1287 * flushing updated RRA entries to disk. */
1288 for (i = 0, rra_start = rra_begin;
1289 i < rrd.stat_head->rra_cnt;
1290 rra_start +=
1291 rrd.rra_def[i].row_cnt * rrd.stat_head->ds_cnt *
1292 sizeof(rrd_value_t), i++) {
1293 /* is th5Aere anything to write for this RRA? If not, continue. */
1294 if (rra_step_cnt[i] == 0)
1295 continue;
1297 /* write the first row */
1298 #ifdef DEBUG
1299 fprintf(stderr, " -- RRA Preseek %ld\n", rrd_file->pos);
1300 #endif
1301 rrd.rra_ptr[i].cur_row++;
1302 if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1303 rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1304 /* positition on the first row */
1305 rra_pos_tmp = rra_start +
1306 (rrd.stat_head->ds_cnt) * (rrd.rra_ptr[i].cur_row) *
1307 sizeof(rrd_value_t);
1308 if (rra_pos_tmp != rra_current) {
1309 if (rrd_seek(rrd_file, rra_pos_tmp, SEEK_SET) != 0) {
1310 rrd_set_error("seek error in rrd");
1311 break;
1312 }
1313 rra_current = rra_pos_tmp;
1314 }
1315 #ifdef DEBUG
1316 fprintf(stderr, " -- RRA Postseek %ld\n", rrd_file->pos);
1317 #endif
1318 scratch_idx = CDP_primary_val;
1319 if (pcdp_summary != NULL) {
1320 rra_time = (current_time - current_time
1321 % (rrd.rra_def[i].pdp_cnt *
1322 rrd.stat_head->pdp_step))
1323 -
1324 ((rra_step_cnt[i] -
1325 1) * rrd.rra_def[i].pdp_cnt *
1326 rrd.stat_head->pdp_step);
1327 }
1328 pcdp_summary =
1329 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1330 scratch_idx, pcdp_summary, &rra_time);
1331 if (rrd_test_error())
1332 break;
1334 /* write other rows of the bulk update, if any */
1335 scratch_idx = CDP_secondary_val;
1336 for (; rra_step_cnt[i] > 1; rra_step_cnt[i]--) {
1337 if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt) {
1338 #ifdef DEBUG
1339 fprintf(stderr,
1340 "Wraparound for RRA %s, %lu updates left\n",
1341 rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1342 #endif
1343 /* wrap */
1344 rrd.rra_ptr[i].cur_row = 0;
1345 /* seek back to beginning of current rra */
1346 if (rrd_seek(rrd_file, rra_start, SEEK_SET) != 0) {
1347 rrd_set_error("seek error in rrd");
1348 break;
1349 }
1350 #ifdef DEBUG
1351 fprintf(stderr, " -- Wraparound Postseek %ld\n",
1352 rrd_file->pos);
1353 #endif
1354 rra_current = rra_start;
1355 }
1356 if (pcdp_summary != NULL) {
1357 rra_time = (current_time - current_time
1358 % (rrd.rra_def[i].pdp_cnt *
1359 rrd.stat_head->pdp_step))
1360 -
1361 ((rra_step_cnt[i] -
1362 2) * rrd.rra_def[i].pdp_cnt *
1363 rrd.stat_head->pdp_step);
1364 }
1365 pcdp_summary =
1366 write_RRA_row(rrd_file, &rrd, i, &rra_current,
1367 scratch_idx, pcdp_summary, &rra_time);
1368 }
1370 if (rrd_test_error())
1371 break;
1372 } /* RRA LOOP */
1374 /* break out of the argument parsing loop if error_string is set */
1375 if (rrd_test_error()) {
1376 free(step_start);
1377 break;
1378 }
1380 } /* endif a pdp_st has occurred */
1381 rrd.live_head->last_up = current_time;
1382 rrd.live_head->last_up_usec = current_time_usec;
1383 free(step_start);
1384 } /* function argument loop */
1386 if (seasonal_coef != NULL)
1387 free(seasonal_coef);
1388 if (last_seasonal_coef != NULL)
1389 free(last_seasonal_coef);
1390 if (rra_step_cnt != NULL)
1391 free(rra_step_cnt);
1392 rpnstack_free(&rpnstack);
1394 #if 0
1395 //rrd_flush(rrd_file); //XXX: really needed?
1396 #endif
1397 /* if we got here and if there is an error and if the file has not been
1398 * written to, then close things up and return. */
1399 if (rrd_test_error()) {
1400 goto err_free_pdp_new;
1401 }
1403 /* aargh ... that was tough ... so many loops ... anyway, its done.
1404 * we just need to write back the live header portion now*/
1406 if (rrd_seek(rrd_file, (sizeof(stat_head_t)
1407 + sizeof(ds_def_t) * rrd.stat_head->ds_cnt
1408 + sizeof(rra_def_t) * rrd.stat_head->rra_cnt),
1409 SEEK_SET) != 0) {
1410 rrd_set_error("seek rrd for live header writeback");
1411 goto err_free_pdp_new;
1412 }
1413 /* for mmap, we did already write to the underlying mapping, so we do
1414 not need to write again. */
1415 #ifndef HAVE_MMAP
1416 if (version >= 3) {
1417 if (rrd_write(rrd_file, rrd.live_head,
1418 sizeof(live_head_t) * 1) != sizeof(live_head_t) * 1) {
1419 rrd_set_error("rrd_write live_head to rrd");
1420 goto err_free_pdp_new;
1421 }
1422 } else {
1423 if (rrd_write(rrd_file, &rrd.live_head->last_up,
1424 sizeof(time_t) * 1) != sizeof(time_t) * 1) {
1425 rrd_set_error("rrd_write live_head to rrd");
1426 goto err_free_pdp_new;
1427 }
1428 }
1431 if (rrd_write(rrd_file, rrd.pdp_prep,
1432 sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)
1433 != (ssize_t) (sizeof(pdp_prep_t) * rrd.stat_head->ds_cnt)) {
1434 rrd_set_error("rrd_write pdp_prep to rrd");
1435 goto err_free_pdp_new;
1436 }
1438 if (rrd_write(rrd_file, rrd.cdp_prep,
1439 sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1440 rrd.stat_head->ds_cnt)
1441 != (ssize_t) (sizeof(cdp_prep_t) * rrd.stat_head->rra_cnt *
1442 rrd.stat_head->ds_cnt)) {
1444 rrd_set_error("rrd_write cdp_prep to rrd");
1445 goto err_free_pdp_new;
1446 }
1448 if (rrd_write(rrd_file, rrd.rra_ptr,
1449 sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)
1450 != (ssize_t) (sizeof(rra_ptr_t) * rrd.stat_head->rra_cnt)) {
1451 rrd_set_error("rrd_write rra_ptr to rrd");
1452 goto err_free_pdp_new;
1453 }
1454 #endif
1456 /* rrd_flush(rrd_file); */
1458 /* calling the smoothing code here guarantees at most
1459 * one smoothing operation per rrd_update call. Unfortunately,
1460 * it is possible with bulk updates, or a long-delayed update
1461 * for smoothing to occur off-schedule. This really isn't
1462 * critical except during the burning cycles. */
1463 if (schedule_smooth) {
1465 rra_start = rra_begin;
1466 for (i = 0; i < rrd.stat_head->rra_cnt; ++i) {
1467 if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1468 cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL) {
1469 #ifdef DEBUG
1470 fprintf(stderr, "Running smoother for rra %ld\n", i);
1471 #endif
1472 apply_smoother(&rrd, i, rra_start, rrd_file);
1473 if (rrd_test_error())
1474 break;
1475 }
1476 rra_start += rrd.rra_def[i].row_cnt
1477 * rrd.stat_head->ds_cnt * sizeof(rrd_value_t);
1478 }
1479 }
1481 /* rrd_dontneed(rrd_file,&rrd); */
1482 rrd_free(&rrd);
1483 rrd_close(rrd_file);
1485 free(pdp_new);
1486 free(tmpl_idx);
1487 free(pdp_temp);
1488 free(updvals);
1489 return (0);
1491 err_free_pdp_new:
1492 free(pdp_new);
1493 err_free_tmpl_idx:
1494 free(tmpl_idx);
1495 err_free_pdp_temp:
1496 free(pdp_temp);
1497 err_free_updvals:
1498 free(updvals);
1499 err_close:
1500 rrd_close(rrd_file);
1501 err_free:
1502 rrd_free(&rrd);
1503 err_out:
1504 return (-1);
1505 }
1507 /*
1508 * get exclusive lock to whole file.
1509 * lock gets removed when we close the file
1510 *
1511 * returns 0 on success
1512 */
1513 int LockRRD(
1514 int in_file)
1515 {
1516 int rcstat;
1518 {
1519 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1520 struct _stat st;
1522 if (_fstat(in_file, &st) == 0) {
1523 rcstat = _locking(in_file, _LK_NBLCK, st.st_size);
1524 } else {
1525 rcstat = -1;
1526 }
1527 #else
1528 struct flock lock;
1530 lock.l_type = F_WRLCK; /* exclusive write lock */
1531 lock.l_len = 0; /* whole file */
1532 lock.l_start = 0; /* start of file */
1533 lock.l_whence = SEEK_SET; /* end of file */
1535 rcstat = fcntl(in_file, F_SETLK, &lock);
1536 #endif
1537 }
1539 return (rcstat);
1540 }